1use std::collections::BTreeMap;
2use std::sync::atomic::AtomicBool;
3use std::sync::{Arc, Mutex, OnceLock};
4
5use futures::StreamExt;
6use serde::{Deserialize, Serialize};
7use sha2::{Digest, Sha256};
8use time::format_description::well_known::Rfc3339;
9
10use crate::event_log::{
11 active_event_log, install_memory_for_current_thread, sanitize_topic_component, AnyEventLog,
12 ConsumerId, EventId, EventLog, LogEvent, Topic,
13};
14use crate::llm::vm_value_to_json;
15use crate::runtime_limits::RuntimeLimits;
16use crate::triggers::event::{ChannelEventPayload, KnownProviderPayload};
17use crate::triggers::{ProviderId, ProviderPayload, SignatureStatus, TenantId, TriggerEvent};
18use crate::value::{VmError, VmStream, VmValue};
19
20const CHANNEL_QUEUE_DEPTH: usize = RuntimeLimits::DEFAULT.default_event_log_queue_depth;
21const CHANNEL_EVENT_KIND: &str = "channel.emit";
22const IDEMPOTENCY_HEADER: &str = "harn.channel.id";
23const NAME_HEADER: &str = "harn.channel.name";
24const SCOPE_HEADER: &str = "harn.channel.scope";
25const SCOPE_ID_HEADER: &str = "harn.channel.scope_id";
26const EMITTED_BY_HEADER: &str = "harn.channel.emitted_by";
27
28pub(crate) const CHANNEL_TRANSCRIPT_TOPIC: &str = "transcript.channel.lifecycle";
32pub(crate) const CHANNEL_EMIT_TRANSCRIPT_KIND: &str = "transcript.channel.emit";
33pub(crate) const CHANNEL_MATCH_TRANSCRIPT_KIND: &str = "transcript.channel.match";
34
35pub const CHANNEL_AUDIT_TOPIC: &str = "lifecycle.channel.audit";
42pub(crate) const CHANNEL_EMIT_RECEIPT_KIND: &str = "channel_emit_receipt";
43pub(crate) const CHANNEL_MATCH_RECEIPT_KIND: &str = "channel_match_receipt";
44const CHANNEL_EMIT_RECEIPT_SCHEMA: &str = "harn.channel_emit_receipt.v1";
47const CHANNEL_MATCH_RECEIPT_SCHEMA: &str = "harn.channel_match_receipt.v1";
48
49pub(crate) const CHANNEL_GUARDRAIL_BLOCKED_KIND: &str = "channel_guardrail_blocked";
54pub(crate) const CHANNEL_GUARDRAIL_WARNING_KIND: &str = "channel_guardrail_warning";
55const CHANNEL_GUARDRAIL_AUDIT_SCHEMA: &str = "harn.channel_guardrail_audit.v1";
56
57const EMIT_TRACE_ID_HEADER: &str = "harn.channel.emit_trace_id";
62const EMIT_SPAN_ID_HEADER: &str = "harn.channel.emit_span_id";
63
64static SESSION_CHANNEL_LOG: OnceLock<Mutex<Option<Arc<AnyEventLog>>>> = OnceLock::new();
65static SIGNING_SALT: OnceLock<Vec<u8>> = OnceLock::new();
66
67#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
68#[serde(rename_all = "snake_case")]
69enum ChannelScope {
70 Session,
71 Pipeline,
72 Tenant,
73 Org,
74}
75
76impl ChannelScope {
77 fn parse(value: &str) -> Result<Self, ChannelError> {
78 match value.trim() {
79 "session" => Ok(Self::Session),
80 "pipeline" => Ok(Self::Pipeline),
81 "tenant" => Ok(Self::Tenant),
82 "org" => Ok(Self::Org),
83 other => Err(ChannelError::malformed(format!(
84 "HARN-CHN-003 malformed channel scope '{other}'"
85 ))),
86 }
87 }
88
89 fn as_str(self) -> &'static str {
90 match self {
91 Self::Session => "session",
92 Self::Pipeline => "pipeline",
93 Self::Tenant => "tenant",
94 Self::Org => "org",
95 }
96 }
97}
98
99#[derive(Clone, Debug, Default)]
100struct ChannelContext {
101 task_id: Option<String>,
102 root_task_id: Option<String>,
103 scope_id: Option<String>,
104 workflow_id: Option<String>,
105 run_id: Option<String>,
106 worker_id: Option<String>,
107 agent_session_id: Option<String>,
108 root_agent_session_id: Option<String>,
109 tenant_id: Option<String>,
110}
111
112#[derive(Clone, Debug, Default)]
113struct ChannelOptions {
114 scope: Option<ChannelScope>,
115 id: Option<String>,
116 tenant_id: Option<String>,
117 session_id: Option<String>,
118 pipeline_id: Option<String>,
119 from_cursor: Option<EventId>,
120 limit: Option<usize>,
121 ttl_ms: Option<i64>,
122}
123
124#[derive(Clone, Debug)]
125struct ResolvedChannel {
126 scope: ChannelScope,
127 scope_id: String,
128 resolved_name: String,
129 topic: Topic,
130 retention: &'static str,
131}
132
133#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
134pub struct SignedTimestamp {
135 pub at_ms: i64,
136 pub at: String,
137 pub algorithm: String,
138 pub key_id: String,
139 pub signature: String,
140}
141
142#[derive(Clone, Debug, Serialize, Deserialize)]
143struct StoredChannelEvent {
144 id: String,
145 name: String,
146 payload: serde_json::Value,
147 emitted_at: SignedTimestamp,
148 emitted_by: String,
149 scope: String,
150 scope_id: String,
151 #[serde(skip_serializing_if = "Option::is_none")]
152 pipeline_id: Option<String>,
153 #[serde(skip_serializing_if = "Option::is_none")]
154 session_id: Option<String>,
155 #[serde(skip_serializing_if = "Option::is_none")]
156 tenant_id: Option<String>,
157 retention: String,
158 #[serde(skip_serializing_if = "Option::is_none")]
159 ttl_ms: Option<i64>,
160}
161
162#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
173pub struct ChannelEmitReceipt {
174 pub event_id: String,
175 pub name_resolved: String,
176 pub scope: String,
177 pub scope_id: String,
178 pub payload_hash: String,
179 pub payload: serde_json::Value,
180 pub emitted_at: SignedTimestamp,
181 pub emitted_by: String,
182 #[serde(skip_serializing_if = "Option::is_none")]
183 pub pipeline_id: Option<String>,
184 #[serde(skip_serializing_if = "Option::is_none")]
185 pub session_id: Option<String>,
186 #[serde(skip_serializing_if = "Option::is_none")]
187 pub tenant_id: Option<String>,
188 pub topic: String,
189 pub inserted: bool,
190 #[serde(skip_serializing_if = "Option::is_none")]
191 pub span_id: Option<u64>,
192}
193
194#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
204pub struct ChannelMatchReceipt {
205 pub event_id: String,
206 pub trigger_id: String,
207 pub binding_key: String,
208 pub name_resolved: String,
209 pub scope: String,
210 pub scope_id: String,
211 pub matched_at: SignedTimestamp,
212 #[serde(skip_serializing_if = "Option::is_none")]
213 pub matched_in_session_id: Option<String>,
214 #[serde(skip_serializing_if = "Option::is_none")]
215 pub batch: Option<ChannelMatchBatchInfo>,
216 pub handler_kind: String,
217 pub handler_result: ChannelMatchResultSummary,
218 #[serde(skip_serializing_if = "Option::is_none")]
219 pub span_id: Option<u64>,
220}
221
222#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
226pub struct ChannelMatchBatchInfo {
227 pub count: usize,
228 pub constituent_event_ids: Vec<String>,
229}
230
231#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
236pub struct ChannelMatchResultSummary {
237 pub status: String,
238 pub attempt_count: u32,
239 #[serde(skip_serializing_if = "Option::is_none")]
240 pub error: Option<String>,
241 #[serde(skip_serializing_if = "Option::is_none")]
242 pub dispatch_failed: Option<bool>,
243}
244
245impl ChannelMatchResultSummary {
246 fn from_dispatch(
247 outcome: &Result<crate::triggers::DispatchOutcome, crate::triggers::DispatchError>,
248 ) -> Self {
249 match outcome {
250 Ok(outcome) => Self {
251 status: outcome.status.as_str().to_string(),
252 attempt_count: outcome.attempt_count,
253 error: outcome.error.clone(),
254 dispatch_failed: None,
255 },
256 Err(error) => Self {
257 status: "dispatch_error".to_string(),
258 attempt_count: 0,
259 error: Some(error.to_string()),
260 dispatch_failed: Some(true),
261 },
262 }
263 }
264}
265
266#[derive(Debug)]
267struct ChannelError(String);
268
269impl ChannelError {
270 fn missing_pipeline() -> Self {
271 Self("HARN-CHN-001 missing pipeline context for pipeline-scoped channel".to_string())
272 }
273
274 fn cross_tenant(message: impl Into<String>) -> Self {
275 Self(format!("HARN-CHN-002 {}", message.into()))
276 }
277
278 fn malformed(message: impl Into<String>) -> Self {
279 Self(message.into())
280 }
281
282 fn scope_ambiguous(message: impl Into<String>) -> Self {
283 Self(format!("HARN-CHN-004 {}", message.into()))
284 }
285}
286
287impl From<ChannelError> for VmError {
288 fn from(error: ChannelError) -> Self {
289 VmError::Runtime(error.0)
290 }
291}
292
293pub fn reset_channel_state() {
294 if let Some(slot) = SESSION_CHANNEL_LOG.get() {
295 *slot.lock().expect("channel session log poisoned") = None;
296 }
297 crate::channel_guardrails::clear();
301}
302
303pub(crate) async fn emit_channel_from_vm(
304 ctx: Option<&crate::vm::AsyncBuiltinCtx>,
305 args: Vec<VmValue>,
306) -> Result<VmValue, VmError> {
307 let name = required_string(args.first(), "emit_channel", "name")?;
308 let payload = vm_value_to_json(
309 args.get(1)
310 .ok_or_else(|| VmError::TypeError("emit_channel: missing payload".to_string()))?,
311 );
312 let options = parse_options(args.get(2), "emit_channel")?;
313 let context = ChannelContext::current(ctx);
314 let resolved = resolve_channel(&name, &options, &context)?;
315 let event_id = options
316 .id
317 .clone()
318 .unwrap_or_else(|| format!("channel_evt_{}", uuid::Uuid::now_v7()));
319 let emitted_by = emitted_by(&context);
320 let emitted_at = signed_timestamp(&resolved, &event_id, &emitted_by);
321 let occurred_at_ms = emitted_at.at_ms;
322
323 let guardrail_context = serde_json::json!({
330 "name": name,
331 "name_resolved": resolved.resolved_name,
332 "scope": resolved.scope.as_str(),
333 "scope_id": resolved.scope_id,
334 "event_id": event_id,
335 "emitted_by": emitted_by,
336 });
337 let decision = crate::channel_guardrails::evaluate(
338 ctx,
339 &payload,
340 &guardrail_context,
341 &resolved.resolved_name,
342 )
343 .await?;
344 if matches!(
345 decision.verdict,
346 crate::channel_guardrails::Verdict::Block { .. }
347 ) {
348 return handle_blocked_emit(
349 &name,
350 &resolved,
351 &event_id,
352 &emitted_by,
353 &emitted_at,
354 &payload,
355 &decision,
356 )
357 .await;
358 }
359 record_guardrail_warnings(
360 &resolved,
361 &event_id,
362 &emitted_by,
363 &payload,
364 decision.fired.as_slice(),
365 )
366 .await;
367 let record = StoredChannelEvent {
368 id: event_id.clone(),
369 name: resolved.resolved_name.clone(),
370 payload,
371 emitted_at,
372 emitted_by: emitted_by.clone(),
373 scope: resolved.scope.as_str().to_string(),
374 scope_id: resolved.scope_id.clone(),
375 pipeline_id: context.pipeline_id_for_receipt(&resolved),
376 session_id: context.session_id_for_receipt(&resolved),
377 tenant_id: context.tenant_id_for_receipt(&resolved),
378 retention: resolved.retention.to_string(),
379 ttl_ms: options.ttl_ms,
380 };
381
382 let mut emit_span = ChannelSpanGuard::start(
387 crate::tracing::SpanKind::ChannelEmit,
388 format!("channel.emit {}", resolved.resolved_name),
389 Vec::new(),
390 );
391 emit_span.set_metadata("event_id", serde_json::json!(record.id));
392 emit_span.set_metadata("scope", serde_json::json!(resolved.scope.as_str()));
393 emit_span.set_metadata("scope_id", serde_json::json!(resolved.scope_id));
394 emit_span.set_metadata("name_resolved", serde_json::json!(resolved.resolved_name));
395 emit_span.set_metadata("payload_summary", summarize_payload(&record.payload));
396 let emit_span_id = crate::tracing::current_span_id().unwrap_or(0);
397 let emit_link = emit_span.link();
398
399 let mut headers = BTreeMap::new();
400 headers.insert(IDEMPOTENCY_HEADER.to_string(), event_id.clone());
401 headers.insert(NAME_HEADER.to_string(), resolved.resolved_name.clone());
402 headers.insert(
403 SCOPE_HEADER.to_string(),
404 resolved.scope.as_str().to_string(),
405 );
406 headers.insert(SCOPE_ID_HEADER.to_string(), resolved.scope_id.clone());
407 headers.insert(EMITTED_BY_HEADER.to_string(), emitted_by.clone());
408
409 let log = log_for_scope(resolved.scope);
410 let mut log_event = LogEvent::new(
411 CHANNEL_EVENT_KIND,
412 serde_json::to_value(&record)
413 .map_err(|error| VmError::Runtime(format!("emit_channel: encode event: {error}")))?,
414 )
415 .with_headers(headers);
416 log_event.occurred_at_ms = occurred_at_ms;
417 let outcome = log
418 .append_idempotent_by_header(&resolved.topic, IDEMPOTENCY_HEADER, &event_id, log_event)
419 .await
420 .map_err(channel_log_error)?;
421 let receipt = receipt_value(
422 &resolved.topic,
423 outcome.event_id,
424 &outcome.event,
425 outcome.inserted,
426 )?;
427 emit_channel_emit_transcript(&record, &resolved, outcome.inserted, emit_span_id);
430 record_channel_emit_receipt(&record, &resolved, outcome.inserted, emit_span_id).await;
435 if outcome.inserted {
439 let payload_json = outcome
440 .event
441 .payload
442 .get("payload")
443 .cloned()
444 .unwrap_or(serde_json::Value::Null);
445 let context_for_fanout = ChannelContext::current(ctx);
446 let fanout_payload = ChannelEventPayload {
447 id: event_id.clone(),
448 name: parse_name(&name)
449 .map(|parsed| parsed.name)
450 .unwrap_or_else(|_| resolved.resolved_name.clone()),
451 name_resolved: resolved.resolved_name.clone(),
452 scope: resolved.scope.as_str().to_string(),
453 scope_id: resolved.scope_id.clone(),
454 payload: payload_json,
455 emitted_by: emitted_by.clone(),
456 tenant_id: context_for_fanout.tenant_id_for_receipt(&resolved),
457 session_id: context_for_fanout.session_id_for_receipt(&resolved),
458 pipeline_id: context_for_fanout.pipeline_id_for_receipt(&resolved),
459 };
460 dispatch_channel_emit_to_triggers(ctx, &resolved, fanout_payload, emit_link).await?;
461 }
462 emit_span.end();
463 Ok(crate::stdlib::json_to_vm_value(&receipt))
464}
465
466pub(crate) async fn channel_events_from_vm(
467 ctx: Option<&crate::vm::AsyncBuiltinCtx>,
468 args: Vec<VmValue>,
469) -> Result<VmValue, VmError> {
470 let name = required_string(args.first(), "channel_events", "name")?;
471 let options = parse_options(args.get(1), "channel_events")?;
472 let context = ChannelContext::current(ctx);
473 let resolved = resolve_channel(&name, &options, &context)?;
474 let events = log_for_scope(resolved.scope)
475 .read_range(
476 &resolved.topic,
477 options.from_cursor,
478 options.limit.unwrap_or(usize::MAX),
479 )
480 .await
481 .map_err(channel_log_error)?;
482 let values = events
483 .into_iter()
484 .map(|(event_id, event)| event_value(&resolved.topic, event_id, event))
485 .collect::<Result<Vec<_>, _>>()?;
486 Ok(crate::stdlib::json_to_vm_value(&serde_json::Value::Array(
487 values,
488 )))
489}
490
491pub(crate) async fn channel_subscribe_from_vm(
492 ctx: Option<&crate::vm::AsyncBuiltinCtx>,
493 args: Vec<VmValue>,
494) -> Result<VmValue, VmError> {
495 let name = required_string(args.first(), "channel_subscribe", "name")?;
496 let options = parse_options(args.get(1), "channel_subscribe")?;
497 let context = ChannelContext::current(ctx);
498 let resolved = resolve_channel(&name, &options, &context)?;
499 let topic = resolved.topic.clone();
500 let mut events = log_for_scope(resolved.scope)
501 .subscribe(&topic, options.from_cursor)
502 .await
503 .map_err(channel_log_error)?;
504 let (tx, rx) = tokio::sync::mpsc::channel::<Result<VmValue, VmError>>(1);
505 tokio::task::spawn_local(async move {
506 while let Some(next) = events.next().await {
507 let value = match next {
508 Ok((event_id, event)) => event_value(&topic, event_id, event)
509 .map(|value| crate::stdlib::json_to_vm_value(&value)),
510 Err(error) => Err(channel_log_error(error)),
511 };
512 if tx.send(value).await.is_err() {
513 return;
514 }
515 }
516 });
517 Ok(VmValue::stream(VmStream {
518 done: Arc::new(AtomicBool::new(false)),
519 receiver: Arc::new(tokio::sync::Mutex::new(rx)),
520 cancel: None,
521 }))
522}
523
524pub(crate) async fn channel_consumer_cursor_from_vm(
525 ctx: Option<&crate::vm::AsyncBuiltinCtx>,
526 args: Vec<VmValue>,
527) -> Result<VmValue, VmError> {
528 let name = required_string(args.first(), "channel_consumer_cursor", "name")?;
529 let consumer = required_consumer_id(args.get(1), "channel_consumer_cursor")?;
530 let options = parse_options(args.get(2), "channel_consumer_cursor")?;
531 let context = ChannelContext::current(ctx);
532 let resolved = resolve_channel(&name, &options, &context)?;
533 let cursor = log_for_scope(resolved.scope)
534 .consumer_cursor(&resolved.topic, &consumer)
535 .await
536 .map_err(channel_log_error)?;
537 match cursor {
538 Some(event_id) => Ok(VmValue::Int(event_id_to_i64(
539 event_id,
540 "channel_consumer_cursor",
541 )?)),
542 None => Ok(VmValue::Nil),
543 }
544}
545
546pub(crate) async fn channel_ack_from_vm(
547 ctx: Option<&crate::vm::AsyncBuiltinCtx>,
548 args: Vec<VmValue>,
549) -> Result<VmValue, VmError> {
550 let name = required_string(args.first(), "channel_ack", "name")?;
551 let consumer = required_consumer_id(args.get(1), "channel_ack")?;
552 let cursor = required_event_id(args.get(2), "channel_ack", "cursor")?;
553 let options = parse_options(args.get(3), "channel_ack")?;
554 let context = ChannelContext::current(ctx);
555 let resolved = resolve_channel(&name, &options, &context)?;
556 log_for_scope(resolved.scope)
557 .ack(&resolved.topic, &consumer, cursor)
558 .await
559 .map_err(channel_log_error)?;
560 Ok(crate::stdlib::json_to_vm_value(&serde_json::json!({
561 "name": name,
562 "name_resolved": resolved.resolved_name,
563 "scope": resolved.scope.as_str(),
564 "scope_id": resolved.scope_id,
565 "topic": resolved.topic.as_str(),
566 "consumer_id": consumer.as_str(),
567 "cursor": cursor,
568 })))
569}
570
571impl ChannelContext {
572 fn current(ctx: Option<&crate::vm::AsyncBuiltinCtx>) -> Self {
573 let mut context = Self::default();
574 if let Some(vm) = ctx.map(crate::vm::AsyncBuiltinCtx::child_vm) {
575 context.task_id = Some(vm.runtime_context.task_id.clone());
576 context.root_task_id = Some(vm.runtime_context.root_task_id.clone());
577 context.scope_id = vm.runtime_context.scope_id.clone();
578 if let VmValue::Dict(values) = crate::runtime_context::runtime_context_value(&vm) {
579 context.workflow_id = dict_string(&values, "workflow_id");
580 context.run_id = dict_string(&values, "run_id");
581 context.worker_id = dict_string(&values, "worker_id");
582 context.agent_session_id = dict_string(&values, "agent_session_id");
583 context.root_agent_session_id = dict_string(&values, "root_agent_session_id");
584 context.tenant_id = dict_string(&values, "tenant_id");
585 }
586 }
587 context.agent_session_id = context
588 .agent_session_id
589 .or_else(crate::agent_sessions::current_session_id);
590 context
591 }
592
593 fn session_id(&self, options: &ChannelOptions) -> Result<String, ChannelError> {
594 if let Some(requested) = options.session_id.as_deref() {
598 if let Some(active) = self.agent_session_id.as_deref() {
599 if active != requested {
600 return Err(ChannelError::scope_ambiguous(format!(
601 "session scope ambiguous: options.session_id '{requested}' \
602 conflicts with active session '{active}'"
603 )));
604 }
605 }
606 }
607 Ok(options
608 .session_id
609 .clone()
610 .or_else(|| self.agent_session_id.clone())
611 .or_else(|| self.root_agent_session_id.clone())
612 .or_else(|| self.scope_id.clone())
613 .or_else(|| self.root_task_id.clone())
614 .unwrap_or_else(|| "session".to_string()))
615 }
616
617 fn pipeline_id(&self, options: &ChannelOptions) -> Result<String, ChannelError> {
618 let active = self.workflow_id.clone().or_else(|| self.run_id.clone());
622 if let (Some(requested), Some(active)) = (options.pipeline_id.as_deref(), active.as_deref())
623 {
624 if requested != active {
625 return Err(ChannelError::scope_ambiguous(format!(
626 "pipeline scope ambiguous: options.pipeline_id '{requested}' \
627 conflicts with active pipeline '{active}'"
628 )));
629 }
630 }
631 options
632 .pipeline_id
633 .clone()
634 .or(active)
635 .ok_or_else(ChannelError::missing_pipeline)
636 }
637
638 fn tenant_id(
639 &self,
640 options: &ChannelOptions,
641 requested: Option<&str>,
642 ) -> Result<String, ChannelError> {
643 let current = self.tenant_id.as_deref();
644 let requested = requested
645 .map(ToOwned::to_owned)
646 .or_else(|| options.tenant_id.clone());
647 if let (Some(current), Some(requested)) = (current, requested.as_deref()) {
648 if current != requested {
649 return Err(ChannelError::cross_tenant(format!(
650 "cross-tenant channel emit requires a grant: current tenant '{current}', requested tenant '{requested}'"
651 )));
652 }
653 }
654 Ok(requested
655 .or_else(|| self.tenant_id.clone())
656 .unwrap_or_else(|| "default".to_string()))
657 }
658
659 fn pipeline_id_for_receipt(&self, resolved: &ResolvedChannel) -> Option<String> {
660 match resolved.scope {
661 ChannelScope::Pipeline => Some(resolved.scope_id.clone()),
662 _ => self.workflow_id.clone().or_else(|| self.run_id.clone()),
663 }
664 }
665
666 fn session_id_for_receipt(&self, resolved: &ResolvedChannel) -> Option<String> {
667 match resolved.scope {
668 ChannelScope::Session => Some(resolved.scope_id.clone()),
669 _ => self
670 .agent_session_id
671 .clone()
672 .or_else(|| self.root_agent_session_id.clone()),
673 }
674 }
675
676 fn tenant_id_for_receipt(&self, resolved: &ResolvedChannel) -> Option<String> {
677 match resolved.scope {
678 ChannelScope::Tenant => Some(resolved.scope_id.clone()),
679 _ => self.tenant_id.clone(),
680 }
681 }
682}
683
684fn resolve_channel(
685 raw_name: &str,
686 options: &ChannelOptions,
687 context: &ChannelContext,
688) -> Result<ResolvedChannel, ChannelError> {
689 let parsed = parse_name(raw_name)?;
690 if let Some(option_scope) = options.scope {
691 if let Some(prefix_scope) = parsed.scope {
692 if prefix_scope != option_scope {
693 return Err(ChannelError::malformed(format!(
694 "HARN-CHN-003 channel scope prefix '{}' conflicts with options.scope '{}'",
695 prefix_scope.as_str(),
696 option_scope.as_str()
697 )));
698 }
699 }
700 }
701
702 let scope = parsed
703 .scope
704 .or(options.scope)
705 .unwrap_or(ChannelScope::Tenant);
706 if scope == ChannelScope::Org {
707 return Err(ChannelError::cross_tenant(
708 "org-scoped channels are disabled until org grants are available",
709 ));
710 }
711
712 validate_channel_name(&parsed.name)?;
713 let scope_id = match scope {
714 ChannelScope::Session => match parsed.scope_id.clone() {
715 Some(id) => id,
716 None => context.session_id(options)?,
717 },
718 ChannelScope::Pipeline => context.pipeline_id(options)?,
719 ChannelScope::Tenant => context.tenant_id(options, parsed.scope_id.as_deref())?,
720 ChannelScope::Org => unreachable!("org scope returned above"),
721 };
722 validate_scope_id(scope, &scope_id)?;
723 let resolved_name = format!("{}:{}:{}", scope.as_str(), scope_id, parsed.name);
724 let topic = Topic::new(format!(
725 "channels.{}.{}.{}",
726 scope.as_str(),
727 sanitize_topic_component(&scope_id),
728 sanitize_topic_component(&parsed.name)
729 ))
730 .map_err(|error| ChannelError::malformed(format!("HARN-CHN-003 {error}")))?;
731 Ok(ResolvedChannel {
732 scope,
733 scope_id,
734 resolved_name,
735 topic,
736 retention: retention_for_scope(scope),
737 })
738}
739
740#[derive(Clone, Debug)]
741struct ParsedName {
742 scope: Option<ChannelScope>,
743 scope_id: Option<String>,
744 name: String,
745}
746
747fn parse_name(raw_name: &str) -> Result<ParsedName, ChannelError> {
748 let raw_name = raw_name.trim();
749 if raw_name.is_empty() {
750 return Err(ChannelError::malformed(
751 "HARN-CHN-003 channel name cannot be empty",
752 ));
753 }
754 let Some((prefix, rest)) = raw_name.split_once(':') else {
755 return Ok(ParsedName {
756 scope: None,
757 scope_id: None,
758 name: raw_name.to_string(),
759 });
760 };
761 let scope = ChannelScope::parse(prefix)?;
762 match scope {
763 ChannelScope::Session | ChannelScope::Pipeline => {
764 if rest.is_empty() || rest.contains(':') {
765 return Err(ChannelError::malformed(format!(
766 "HARN-CHN-003 malformed {} channel name '{raw_name}'",
767 scope.as_str()
768 )));
769 }
770 Ok(ParsedName {
771 scope: Some(scope),
772 scope_id: None,
773 name: rest.to_string(),
774 })
775 }
776 ChannelScope::Tenant => {
777 if rest.is_empty() {
778 return Err(ChannelError::malformed(
779 "HARN-CHN-003 tenant channel name cannot be empty",
780 ));
781 }
782 let (scope_id, name) = match rest.split_once(':') {
783 Some((tenant_id, name)) if !tenant_id.is_empty() && !name.is_empty() => {
784 (Some(tenant_id.to_string()), name.to_string())
785 }
786 Some(_) => {
787 return Err(ChannelError::malformed(format!(
788 "HARN-CHN-003 malformed tenant channel name '{raw_name}'"
789 )))
790 }
791 None => (None, rest.to_string()),
792 };
793 Ok(ParsedName {
794 scope: Some(scope),
795 scope_id,
796 name,
797 })
798 }
799 ChannelScope::Org => {
800 let Some((org_id, name)) = rest.split_once(':') else {
801 return Err(ChannelError::malformed(format!(
802 "HARN-CHN-003 org channel names must be org:<org_id>:<name>, got '{raw_name}'"
803 )));
804 };
805 if org_id.is_empty() || name.is_empty() {
806 return Err(ChannelError::malformed(format!(
807 "HARN-CHN-003 malformed org channel name '{raw_name}'"
808 )));
809 }
810 Ok(ParsedName {
811 scope: Some(scope),
812 scope_id: Some(org_id.to_string()),
813 name: name.to_string(),
814 })
815 }
816 }
817}
818
819fn validate_channel_name(name: &str) -> Result<(), ChannelError> {
820 if name.trim().is_empty()
821 || name.contains(':')
822 || name.chars().any(|ch| ch.is_control() || ch.is_whitespace())
823 {
824 return Err(ChannelError::malformed(format!(
825 "HARN-CHN-003 malformed channel name '{name}'"
826 )));
827 }
828 Ok(())
829}
830
831fn validate_scope_id(scope: ChannelScope, scope_id: &str) -> Result<(), ChannelError> {
832 if scope_id.trim().is_empty()
833 || scope_id
834 .chars()
835 .any(|ch| ch.is_control() || ch.is_whitespace() || ch == ':')
836 {
837 return Err(ChannelError::malformed(format!(
838 "HARN-CHN-003 malformed {} scope id '{scope_id}'",
839 scope.as_str()
840 )));
841 }
842 Ok(())
843}
844
845fn log_for_scope(scope: ChannelScope) -> Arc<AnyEventLog> {
846 match scope {
847 ChannelScope::Session => {
848 let slot = SESSION_CHANNEL_LOG.get_or_init(|| Mutex::new(None));
849 let mut guard = slot.lock().expect("channel session log poisoned");
850 guard
851 .get_or_insert_with(|| {
852 Arc::new(AnyEventLog::Memory(crate::event_log::MemoryEventLog::new(
853 CHANNEL_QUEUE_DEPTH,
854 )))
855 })
856 .clone()
857 }
858 ChannelScope::Pipeline | ChannelScope::Tenant => active_event_log()
859 .unwrap_or_else(|| install_memory_for_current_thread(CHANNEL_QUEUE_DEPTH)),
860 ChannelScope::Org => unreachable!("org-scoped channel log is disabled"),
861 }
862}
863
864fn signed_timestamp(
865 resolved: &ResolvedChannel,
866 event_id: &str,
867 emitted_by: &str,
868) -> SignedTimestamp {
869 let at = crate::clock_mock::now_utc();
870 let at_ms = harn_clock::offset_datetime_to_ms(at);
871 let at_text = at.format(&Rfc3339).unwrap_or_else(|_| at.to_string());
872 let material = format!(
873 "harn.channel.timestamp.v1\nat_ms={at_ms}\nid={event_id}\nname={}\nscope={}\nscope_id={}\nemitted_by={emitted_by}\n",
874 resolved.resolved_name,
875 resolved.scope.as_str(),
876 resolved.scope_id
877 );
878 let signature = hex::encode(crate::connectors::hmac::hmac_sha256(
879 signing_salt(),
880 material.as_bytes(),
881 ));
882 SignedTimestamp {
883 at_ms,
884 at: at_text,
885 algorithm: "hmac-sha256".to_string(),
886 key_id: "local-session".to_string(),
887 signature: format!("sha256:{signature}"),
888 }
889}
890
891fn signing_salt() -> &'static [u8] {
892 SIGNING_SALT
893 .get_or_init(|| {
894 format!(
895 "harn-channel-signing-salt:{}:{}",
896 std::process::id(),
897 uuid::Uuid::now_v7()
898 )
899 .into_bytes()
900 })
901 .as_slice()
902}
903
904fn signed_match_timestamp(
911 resolved: &ResolvedChannel,
912 event_id: &str,
913 trigger_id: &str,
914) -> SignedTimestamp {
915 let at = crate::clock_mock::now_utc();
916 let at_ms = harn_clock::offset_datetime_to_ms(at);
917 let at_text = at.format(&Rfc3339).unwrap_or_else(|_| at.to_string());
918 let material = format!(
919 "harn.channel.match_timestamp.v1\nat_ms={at_ms}\nevent_id={event_id}\ntrigger_id={trigger_id}\nname={}\nscope={}\nscope_id={}\n",
920 resolved.resolved_name,
921 resolved.scope.as_str(),
922 resolved.scope_id
923 );
924 let signature = hex::encode(crate::connectors::hmac::hmac_sha256(
925 signing_salt(),
926 material.as_bytes(),
927 ));
928 SignedTimestamp {
929 at_ms,
930 at: at_text,
931 algorithm: "hmac-sha256".to_string(),
932 key_id: "local-session".to_string(),
933 signature: format!("sha256:{signature}"),
934 }
935}
936
937pub fn channel_payload_hash(payload: &serde_json::Value) -> String {
944 let canonical = canonical_json_string(payload);
945 let digest = Sha256::digest(canonical.as_bytes());
946 format!("sha256:{}", hex::encode(digest))
947}
948
949fn canonical_json_string(value: &serde_json::Value) -> String {
955 match value {
956 serde_json::Value::Object(map) => {
957 let mut sorted: std::collections::BTreeMap<&String, &serde_json::Value> =
958 std::collections::BTreeMap::new();
959 for (key, value) in map {
960 sorted.insert(key, value);
961 }
962 let parts: Vec<String> = sorted
963 .iter()
964 .map(|(key, value)| {
965 format!(
966 "{}:{}",
967 serde_json::to_string(key).unwrap_or_else(|_| (*key).clone()),
968 canonical_json_string(value)
969 )
970 })
971 .collect();
972 format!("{{{}}}", parts.join(","))
973 }
974 serde_json::Value::Array(items) => {
975 let parts: Vec<String> = items.iter().map(canonical_json_string).collect();
976 format!("[{}]", parts.join(","))
977 }
978 other => serde_json::to_string(other).unwrap_or_else(|_| "null".to_string()),
979 }
980}
981
982async fn append_channel_audit_event(
991 kind: &'static str,
992 schema: &'static str,
993 payload: serde_json::Value,
994) {
995 let topic = match Topic::new(CHANNEL_AUDIT_TOPIC) {
996 Ok(topic) => topic,
997 Err(_) => return,
998 };
999 let log = active_event_log()
1000 .unwrap_or_else(|| install_memory_for_current_thread(CHANNEL_QUEUE_DEPTH));
1001 let mut headers = BTreeMap::new();
1002 headers.insert("schema".to_string(), schema.to_string());
1003 let _ = log
1004 .append(&topic, LogEvent::new(kind, payload).with_headers(headers))
1005 .await;
1006}
1007
1008async fn record_guardrail_audit(
1016 kind: &'static str,
1017 resolved: &ResolvedChannel,
1018 event_id: &str,
1019 emitted_by: &str,
1020 payload: &serde_json::Value,
1021 fired: &[crate::channel_guardrails::FiredGuardrail],
1022) {
1023 let fired_json: Vec<serde_json::Value> = fired
1024 .iter()
1025 .map(|entry| {
1026 serde_json::json!({
1027 "id": entry.id,
1028 "kind": entry.kind,
1029 "verdict_label": entry.verdict_label,
1030 "reason": entry.reason,
1031 })
1032 })
1033 .collect();
1034 let audit_payload = serde_json::json!({
1035 "event_id": event_id,
1036 "name_resolved": resolved.resolved_name,
1037 "scope": resolved.scope.as_str(),
1038 "scope_id": resolved.scope_id,
1039 "emitted_by": emitted_by,
1040 "payload_hash": channel_payload_hash(payload),
1041 "payload": payload,
1042 "fired": fired_json,
1043 });
1044 append_channel_audit_event(kind, CHANNEL_GUARDRAIL_AUDIT_SCHEMA, audit_payload.clone()).await;
1045 crate::orchestration::record_lifecycle_audit(kind, audit_payload);
1049}
1050
1051async fn record_guardrail_warnings(
1056 resolved: &ResolvedChannel,
1057 event_id: &str,
1058 emitted_by: &str,
1059 payload: &serde_json::Value,
1060 fired: &[crate::channel_guardrails::FiredGuardrail],
1061) {
1062 if fired.is_empty() {
1063 return;
1064 }
1065 record_guardrail_audit(
1066 CHANNEL_GUARDRAIL_WARNING_KIND,
1067 resolved,
1068 event_id,
1069 emitted_by,
1070 payload,
1071 fired,
1072 )
1073 .await;
1074}
1075
1076async fn handle_blocked_emit(
1082 raw_name: &str,
1083 resolved: &ResolvedChannel,
1084 event_id: &str,
1085 emitted_by: &str,
1086 emitted_at: &SignedTimestamp,
1087 payload: &serde_json::Value,
1088 decision: &crate::channel_guardrails::GuardrailDecision,
1089) -> Result<VmValue, VmError> {
1090 record_guardrail_audit(
1091 CHANNEL_GUARDRAIL_BLOCKED_KIND,
1092 resolved,
1093 event_id,
1094 emitted_by,
1095 payload,
1096 decision.fired.as_slice(),
1097 )
1098 .await;
1099 let block_reason = decision
1100 .fired
1101 .iter()
1102 .rev()
1103 .find_map(|f| {
1104 if f.verdict_label == CHANNEL_GUARDRAIL_BLOCKED_KIND
1105 || f.verdict_label.contains("block")
1106 {
1107 Some(f.reason.clone())
1108 } else {
1109 None
1110 }
1111 })
1112 .unwrap_or_else(|| "guardrail blocked".to_string());
1113 let fired_json: Vec<serde_json::Value> = decision
1114 .fired
1115 .iter()
1116 .map(|entry| {
1117 serde_json::json!({
1118 "id": entry.id,
1119 "kind": entry.kind,
1120 "verdict_label": entry.verdict_label,
1121 "reason": entry.reason,
1122 })
1123 })
1124 .collect();
1125 let receipt = serde_json::json!({
1126 "event_id": event_id,
1127 "cursor": serde_json::Value::Null,
1128 "id": event_id,
1129 "name": raw_name,
1130 "name_resolved": resolved.resolved_name,
1131 "scope": resolved.scope.as_str(),
1132 "scope_id": resolved.scope_id,
1133 "emitted_at": emitted_at,
1134 "emitted_by": emitted_by,
1135 "retention": resolved.retention,
1136 "topic": resolved.topic.as_str(),
1137 "inserted": false,
1138 "duplicate": false,
1139 "blocked": true,
1140 "block_reason": block_reason,
1141 "guardrail_fired": fired_json,
1142 });
1143 Ok(crate::stdlib::json_to_vm_value(&receipt))
1144}
1145
1146async fn record_channel_emit_receipt(
1151 record: &StoredChannelEvent,
1152 resolved: &ResolvedChannel,
1153 inserted: bool,
1154 span_id: u64,
1155) {
1156 let receipt = ChannelEmitReceipt {
1157 event_id: record.id.clone(),
1158 name_resolved: resolved.resolved_name.clone(),
1159 scope: resolved.scope.as_str().to_string(),
1160 scope_id: resolved.scope_id.clone(),
1161 payload_hash: channel_payload_hash(&record.payload),
1162 payload: record.payload.clone(),
1163 emitted_at: record.emitted_at.clone(),
1164 emitted_by: record.emitted_by.clone(),
1165 pipeline_id: record.pipeline_id.clone(),
1166 session_id: record.session_id.clone(),
1167 tenant_id: record.tenant_id.clone(),
1168 topic: resolved.topic.as_str().to_string(),
1169 inserted,
1170 span_id: if span_id == 0 { None } else { Some(span_id) },
1171 };
1172 let payload = match serde_json::to_value(&receipt) {
1173 Ok(value) => value,
1174 Err(_) => return,
1175 };
1176 append_channel_audit_event(
1177 CHANNEL_EMIT_RECEIPT_KIND,
1178 CHANNEL_EMIT_RECEIPT_SCHEMA,
1179 payload,
1180 )
1181 .await;
1182}
1183
1184#[allow(clippy::too_many_arguments)]
1190async fn record_channel_match_receipt(
1191 trigger_id: &str,
1192 binding_key: &str,
1193 handler_kind: &str,
1194 resolved: &ResolvedChannel,
1195 event_id: &str,
1196 matched_in_session_id: Option<&str>,
1197 batch: Option<ChannelMatchBatchInfo>,
1198 span_id: u64,
1199 dispatch_outcome: &Result<crate::triggers::DispatchOutcome, crate::triggers::DispatchError>,
1200) {
1201 let receipt = ChannelMatchReceipt {
1202 event_id: event_id.to_string(),
1203 trigger_id: trigger_id.to_string(),
1204 binding_key: binding_key.to_string(),
1205 name_resolved: resolved.resolved_name.clone(),
1206 scope: resolved.scope.as_str().to_string(),
1207 scope_id: resolved.scope_id.clone(),
1208 matched_at: signed_match_timestamp(resolved, event_id, trigger_id),
1209 matched_in_session_id: matched_in_session_id.map(|s| s.to_string()),
1210 batch,
1211 handler_kind: handler_kind.to_string(),
1212 handler_result: ChannelMatchResultSummary::from_dispatch(dispatch_outcome),
1213 span_id: if span_id == 0 { None } else { Some(span_id) },
1214 };
1215 let payload = match serde_json::to_value(&receipt) {
1216 Ok(value) => value,
1217 Err(_) => return,
1218 };
1219 append_channel_audit_event(
1220 CHANNEL_MATCH_RECEIPT_KIND,
1221 CHANNEL_MATCH_RECEIPT_SCHEMA,
1222 payload,
1223 )
1224 .await;
1225}
1226
1227fn batch_info_from_summary(
1231 batch_summary: Option<&serde_json::Value>,
1232) -> Option<ChannelMatchBatchInfo> {
1233 let summary = batch_summary?.as_object()?;
1234 let count = summary
1235 .get("count")
1236 .and_then(|v| v.as_u64())
1237 .map(|n| n as usize)?;
1238 let constituent_event_ids = summary
1239 .get("constituent_event_ids")
1240 .and_then(|v| v.as_array())
1241 .map(|arr| {
1242 arr.iter()
1243 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1244 .collect()
1245 })
1246 .unwrap_or_default();
1247 Some(ChannelMatchBatchInfo {
1248 count,
1249 constituent_event_ids,
1250 })
1251}
1252
1253fn emitted_by(context: &ChannelContext) -> String {
1254 context
1255 .worker_id
1256 .clone()
1257 .or_else(|| context.agent_session_id.clone())
1258 .or_else(|| context.task_id.clone())
1259 .unwrap_or_else(|| "harn".to_string())
1260}
1261
1262fn retention_for_scope(scope: ChannelScope) -> &'static str {
1263 match scope {
1264 ChannelScope::Session => "in_process_session",
1265 ChannelScope::Pipeline => "pipeline_event_log",
1266 ChannelScope::Tenant => "tenant_event_log",
1267 ChannelScope::Org => "org_event_log",
1268 }
1269}
1270
1271fn receipt_value(
1272 topic: &Topic,
1273 event_id: EventId,
1274 event: &LogEvent,
1275 inserted: bool,
1276) -> Result<serde_json::Value, VmError> {
1277 let record = stored_record(event)?;
1278 Ok(serde_json::json!({
1279 "event_id": event_id,
1280 "cursor": event_id,
1281 "id": record.id,
1282 "name": record.name,
1283 "name_resolved": record.name,
1284 "scope": record.scope,
1285 "scope_id": record.scope_id,
1286 "payload": record.payload,
1287 "emitted_at": record.emitted_at,
1288 "emitted_by": record.emitted_by,
1289 "pipeline_id": record.pipeline_id,
1290 "session_id": record.session_id,
1291 "tenant_id": record.tenant_id,
1292 "retention": record.retention,
1293 "ttl_ms": record.ttl_ms,
1294 "topic": topic.as_str(),
1295 "inserted": inserted,
1296 "duplicate": !inserted,
1297 }))
1298}
1299
1300fn event_value(
1301 topic: &Topic,
1302 event_id: EventId,
1303 event: LogEvent,
1304) -> Result<serde_json::Value, VmError> {
1305 let record = stored_record(&event)?;
1306 Ok(serde_json::json!({
1307 "event_id": event_id,
1308 "cursor": event_id,
1309 "topic": topic.as_str(),
1310 "kind": event.kind,
1311 "headers": event.headers,
1312 "occurred_at_ms": event.occurred_at_ms,
1313 "id": record.id,
1314 "name": record.name,
1315 "name_resolved": record.name,
1316 "scope": record.scope,
1317 "scope_id": record.scope_id,
1318 "payload": record.payload,
1319 "emitted_at": record.emitted_at,
1320 "emitted_by": record.emitted_by,
1321 "pipeline_id": record.pipeline_id,
1322 "session_id": record.session_id,
1323 "tenant_id": record.tenant_id,
1324 "retention": record.retention,
1325 "ttl_ms": record.ttl_ms,
1326 }))
1327}
1328
1329fn stored_record(event: &LogEvent) -> Result<StoredChannelEvent, VmError> {
1330 serde_json::from_value(event.payload.clone()).map_err(|error| {
1331 VmError::Runtime(format!(
1332 "channel event store contained malformed channel payload: {error}"
1333 ))
1334 })
1335}
1336
1337fn parse_options(value: Option<&VmValue>, builtin: &str) -> Result<ChannelOptions, VmError> {
1338 let Some(value) = value else {
1339 return Ok(ChannelOptions::default());
1340 };
1341 match value {
1342 VmValue::Nil => Ok(ChannelOptions::default()),
1343 VmValue::Dict(options) => Ok(ChannelOptions {
1344 scope: option_string(options, "scope", builtin)?
1345 .map(|scope| ChannelScope::parse(&scope))
1346 .transpose()
1347 .map_err(VmError::from)?,
1348 id: option_string(options, "id", builtin)?,
1349 tenant_id: option_string(options, "tenant_id", builtin)?,
1350 session_id: option_string(options, "session_id", builtin)?,
1351 pipeline_id: option_string(options, "pipeline_id", builtin)?,
1352 from_cursor: option_non_negative_int(options, "from_cursor", builtin)?
1353 .or(option_non_negative_int(options, "cursor", builtin)?)
1354 .map(|value| value as EventId),
1355 limit: option_non_negative_int(options, "limit", builtin)?.map(|value| value as usize),
1356 ttl_ms: option_duration_ms(options, "ttl", builtin)?,
1357 }),
1358 other => Err(VmError::TypeError(format!(
1359 "{builtin}: options must be a dict or nil, got {}",
1360 other.type_name()
1361 ))),
1362 }
1363}
1364
1365fn required_string(value: Option<&VmValue>, builtin: &str, name: &str) -> Result<String, VmError> {
1366 match value {
1367 Some(VmValue::String(value)) => Ok(value.to_string()),
1368 Some(other) => Err(VmError::TypeError(format!(
1369 "{builtin}: {name} must be a string, got {}",
1370 other.type_name()
1371 ))),
1372 None => Err(VmError::TypeError(format!("{builtin}: missing {name}"))),
1373 }
1374}
1375
1376fn required_consumer_id(value: Option<&VmValue>, builtin: &str) -> Result<ConsumerId, VmError> {
1377 ConsumerId::new(required_string(value, builtin, "consumer_id")?).map_err(channel_log_error)
1378}
1379
1380fn required_event_id(
1381 value: Option<&VmValue>,
1382 builtin: &str,
1383 name: &str,
1384) -> Result<EventId, VmError> {
1385 match value {
1386 Some(VmValue::Int(value)) if *value >= 0 => Ok(*value as EventId),
1387 Some(other) => Err(VmError::TypeError(format!(
1388 "{builtin}: {name} must be a non-negative int, got {}",
1389 other.type_name()
1390 ))),
1391 None => Err(VmError::TypeError(format!("{builtin}: missing {name}"))),
1392 }
1393}
1394
1395fn event_id_to_i64(value: EventId, builtin: &str) -> Result<i64, VmError> {
1396 i64::try_from(value)
1397 .map_err(|_| VmError::Runtime(format!("{builtin}: event id {value} exceeds int range")))
1398}
1399
1400fn option_string(
1401 options: &crate::value::DictMap,
1402 key: &str,
1403 builtin: &str,
1404) -> Result<Option<String>, VmError> {
1405 match options.get(key) {
1406 None | Some(VmValue::Nil) => Ok(None),
1407 Some(VmValue::String(value)) if !value.trim().is_empty() => Ok(Some(value.to_string())),
1408 Some(VmValue::String(_)) => Err(VmError::TypeError(format!(
1409 "{builtin}: options.{key} cannot be empty"
1410 ))),
1411 Some(other) => Err(VmError::TypeError(format!(
1412 "{builtin}: options.{key} must be a string or nil, got {}",
1413 other.type_name()
1414 ))),
1415 }
1416}
1417
1418fn option_non_negative_int(
1419 options: &crate::value::DictMap,
1420 key: &str,
1421 builtin: &str,
1422) -> Result<Option<u64>, VmError> {
1423 match options.get(key) {
1424 None | Some(VmValue::Nil) => Ok(None),
1425 Some(VmValue::Int(value)) if *value >= 0 => Ok(Some(*value as u64)),
1426 Some(other) => Err(VmError::TypeError(format!(
1427 "{builtin}: options.{key} must be a non-negative int or nil, got {}",
1428 other.type_name()
1429 ))),
1430 }
1431}
1432
1433fn option_duration_ms(
1434 options: &crate::value::DictMap,
1435 key: &str,
1436 builtin: &str,
1437) -> Result<Option<i64>, VmError> {
1438 match options.get(key) {
1439 None | Some(VmValue::Nil) => Ok(None),
1440 Some(VmValue::Duration(value)) if *value >= 0 => Ok(Some(*value)),
1441 Some(VmValue::Int(value)) if *value >= 0 => Ok(Some(*value)),
1442 Some(other) => Err(VmError::TypeError(format!(
1443 "{builtin}: options.{key} must be a non-negative duration, int, or nil, got {}",
1444 other.type_name()
1445 ))),
1446 }
1447}
1448
1449fn dict_string(values: &crate::value::DictMap, key: &str) -> Option<String> {
1450 match values.get(key) {
1451 Some(VmValue::String(value)) if !value.is_empty() => Some(value.to_string()),
1452 _ => None,
1453 }
1454}
1455
1456fn channel_log_error(error: crate::event_log::LogError) -> VmError {
1457 VmError::Runtime(format!("channel event log: {error}"))
1458}
1459
1460struct ChannelSpanGuard {
1469 span_id: u64,
1470 otel_span: tracing::Span,
1471}
1472
1473impl ChannelSpanGuard {
1474 fn start(
1475 kind: crate::tracing::SpanKind,
1476 name: String,
1477 links: Vec<crate::tracing::SpanLink>,
1478 ) -> Self {
1479 Self::start_with_parenting(kind, name, links, true)
1480 }
1481
1482 fn start_detached(
1483 kind: crate::tracing::SpanKind,
1484 name: String,
1485 links: Vec<crate::tracing::SpanLink>,
1486 ) -> Self {
1487 Self::start_with_parenting(kind, name, links, false)
1488 }
1489
1490 fn start_with_parenting(
1491 kind: crate::tracing::SpanKind,
1492 name: String,
1493 links: Vec<crate::tracing::SpanLink>,
1494 inherit_parent: bool,
1495 ) -> Self {
1496 let span_id = if inherit_parent {
1497 crate::tracing::span_start_with_links(kind, name.clone(), links.clone())
1498 } else {
1499 crate::tracing::span_start_detached_with_links(kind, name.clone(), links.clone())
1500 };
1501 let otel_span = tracing::info_span!(
1502 target: "harn.vm.channel",
1503 "harn.channel",
1504 harn.kind = kind.as_str(),
1505 harn.name = %name,
1506 );
1507 for link in links {
1508 let trace_id = crate::TraceId(link.trace_id);
1509 let mut attributes: std::collections::HashMap<String, String> =
1510 link.attributes.into_iter().collect();
1511 attributes
1512 .entry("harn.link.kind".to_string())
1513 .or_insert_with(|| "channel_emit".to_string());
1514 let _ = crate::observability::otel::set_span_link(
1515 &otel_span,
1516 &trace_id,
1517 &link.span_id,
1518 Some(attributes),
1519 );
1520 }
1521 Self { span_id, otel_span }
1522 }
1523
1524 fn link(&self) -> Option<crate::tracing::SpanLink> {
1525 crate::observability::otel::current_span_context_hex(&self.otel_span)
1526 .map(|(trace_id, span_id)| crate::tracing::SpanLink::new(trace_id, span_id))
1527 .or_else(|| crate::tracing::span_link(self.span_id))
1528 }
1529
1530 fn set_metadata(&self, key: &str, value: serde_json::Value) {
1531 crate::tracing::span_set_metadata(self.span_id, key, value);
1532 }
1533
1534 fn end(&mut self) {
1535 if self.span_id != 0 {
1536 crate::tracing::span_end(self.span_id);
1537 self.span_id = 0;
1538 }
1539 }
1540}
1541
1542impl Drop for ChannelSpanGuard {
1543 fn drop(&mut self) {
1544 self.end();
1545 }
1546}
1547
1548fn summarize_payload(payload: &serde_json::Value) -> serde_json::Value {
1555 const MAX_STRING_LEN: usize = 120;
1556 match payload {
1557 serde_json::Value::Null => serde_json::json!({"kind": "null"}),
1558 serde_json::Value::Bool(value) => serde_json::json!({"kind": "bool", "value": value}),
1559 serde_json::Value::Number(value) => serde_json::json!({"kind": "number", "value": value}),
1560 serde_json::Value::String(value) => {
1561 let truncated: String = value.chars().take(MAX_STRING_LEN).collect();
1562 let len = value.chars().count();
1563 serde_json::json!({
1564 "kind": "string",
1565 "value": truncated,
1566 "truncated": len > MAX_STRING_LEN,
1567 "length": len,
1568 })
1569 }
1570 serde_json::Value::Array(items) => {
1571 serde_json::json!({"kind": "array", "length": items.len()})
1572 }
1573 serde_json::Value::Object(map) => {
1574 let fields: Vec<&String> = map.keys().take(8).collect();
1575 serde_json::json!({
1576 "kind": "object",
1577 "field_count": map.len(),
1578 "fields": fields,
1579 })
1580 }
1581 }
1582}
1583
1584fn emit_channel_transcript_event(kind: &'static str, payload: serde_json::Value) {
1589 let Some(log) = active_event_log() else {
1590 return;
1591 };
1592 let Ok(topic) = Topic::new(CHANNEL_TRANSCRIPT_TOPIC) else {
1593 return;
1594 };
1595 let event = LogEvent::new(kind, payload);
1596 if tokio::runtime::Handle::try_current().is_ok() {
1597 if let Ok(join) = std::thread::Builder::new()
1598 .name("harn-channel-transcript".to_string())
1599 .spawn(move || {
1600 let _ = futures::executor::block_on(log.append(&topic, event));
1601 })
1602 {
1603 let _ = join.join();
1604 }
1605 } else {
1606 let _ = futures::executor::block_on(log.append(&topic, event));
1607 }
1608}
1609
1610fn emit_channel_emit_transcript(
1615 record: &StoredChannelEvent,
1616 resolved: &ResolvedChannel,
1617 inserted: bool,
1618 span_id: u64,
1619) {
1620 let payload = serde_json::json!({
1621 "event_id": record.id,
1622 "name": record.name,
1623 "name_resolved": resolved.resolved_name,
1624 "scope": record.scope,
1625 "scope_id": record.scope_id,
1626 "payload_summary": summarize_payload(&record.payload),
1627 "emitted_at": record.emitted_at,
1628 "emitted_at_ms": record.emitted_at.at_ms,
1629 "emitted_by": record.emitted_by,
1630 "session_id": record.session_id,
1631 "pipeline_id": record.pipeline_id,
1632 "tenant_id": record.tenant_id,
1633 "inserted": inserted,
1634 "duplicate": !inserted,
1635 "span_id": if span_id == 0 { serde_json::Value::Null } else { serde_json::json!(span_id) },
1636 });
1637 emit_channel_transcript_event(CHANNEL_EMIT_TRANSCRIPT_KIND, payload);
1638}
1639
1640#[allow(clippy::too_many_arguments)]
1645fn emit_channel_match_transcript(
1646 trigger_id: &str,
1647 handler_kind: &str,
1648 resolved: &ResolvedChannel,
1649 event_id: &str,
1650 matched_at_ms: i64,
1651 matched_in_session_id: Option<&str>,
1652 span_id: u64,
1653 batch: Option<serde_json::Value>,
1654) {
1655 let mut payload = serde_json::json!({
1656 "event_id": event_id,
1657 "name_resolved": resolved.resolved_name,
1658 "scope": resolved.scope.as_str(),
1659 "scope_id": resolved.scope_id,
1660 "trigger_id": trigger_id,
1661 "handler_kind": handler_kind,
1662 "matched_at_ms": matched_at_ms,
1663 "matched_in_session_id": matched_in_session_id,
1664 "span_id": if span_id == 0 { serde_json::Value::Null } else { serde_json::json!(span_id) },
1665 });
1666 if let Some(batch) = batch {
1667 if let Some(map) = payload.as_object_mut() {
1668 map.insert("batch".to_string(), batch);
1669 }
1670 }
1671 emit_channel_transcript_event(CHANNEL_MATCH_TRANSCRIPT_KIND, payload);
1672}
1673
1674fn emit_links_from_event(event: &TriggerEvent) -> Vec<crate::tracing::SpanLink> {
1681 let mut links = Vec::new();
1682 if let (Some(trace_id), Some(span_id)) = (
1683 event.headers.get(EMIT_TRACE_ID_HEADER),
1684 event.headers.get(EMIT_SPAN_ID_HEADER),
1685 ) {
1686 links.push(
1687 crate::tracing::SpanLink::new(trace_id.clone(), span_id.clone()).with_attributes(
1688 BTreeMap::from([("harn.link.kind".to_string(), "channel_emit".to_string())]),
1689 ),
1690 );
1691 }
1692 links
1693}
1694
1695fn emit_links_from_batch(events: &[TriggerEvent]) -> Vec<crate::tracing::SpanLink> {
1696 let mut links = Vec::new();
1697 for event in events {
1698 links.extend(emit_links_from_event(event));
1699 }
1700 links
1701}
1702
1703fn batch_summary_for_transcript(events: &[TriggerEvent]) -> serde_json::Value {
1704 let constituent_ids: Vec<String> = events.iter().map(|event| event.id.0.clone()).collect();
1705 serde_json::json!({
1706 "count": events.len(),
1707 "constituent_event_ids": constituent_ids,
1708 })
1709}
1710
1711#[derive(Clone, Debug, PartialEq, Eq)]
1719pub struct ChannelSelector {
1720 scope: ChannelScope,
1721 scope_id_pattern: ScopeIdPattern,
1722 name: String,
1723}
1724
1725#[derive(Clone, Debug, PartialEq, Eq)]
1726enum ScopeIdPattern {
1727 Current,
1730 Exact(String),
1732 Wildcard,
1735}
1736
1737impl ChannelSelector {
1738 pub fn parse(input: &str) -> Result<Self, String> {
1748 let input = input.trim();
1749 let rest = input
1750 .strip_prefix("channel:")
1751 .ok_or_else(|| format!("channel selector must start with `channel:`, got `{input}`"))?;
1752 if rest.is_empty() {
1753 return Err("channel selector cannot be empty after `channel:` prefix".to_string());
1754 }
1755
1756 let (head, tail_opt) = match rest.split_once(':') {
1757 Some((head, tail)) => (head, Some(tail)),
1758 None => (rest, None),
1759 };
1760 let parsed_scope = ChannelScope::parse(head).ok();
1761 match (parsed_scope, tail_opt) {
1762 (None, _) => {
1764 let name = rest.to_string();
1765 validate_selector_name(&name)?;
1766 Ok(Self {
1767 scope: ChannelScope::Tenant,
1768 scope_id_pattern: ScopeIdPattern::Current,
1769 name,
1770 })
1771 }
1772 (Some(scope @ (ChannelScope::Session | ChannelScope::Pipeline)), Some(name))
1773 if !name.is_empty() =>
1774 {
1775 if name.contains(':') {
1776 return Err(format!(
1777 "channel selector `{input}`: {} scope expects `<name>` with no extra colons",
1778 scope.as_str()
1779 ));
1780 }
1781 validate_selector_name(name)?;
1782 Ok(Self {
1783 scope,
1784 scope_id_pattern: ScopeIdPattern::Current,
1785 name: name.to_string(),
1786 })
1787 }
1788 (Some(scope @ (ChannelScope::Tenant | ChannelScope::Org)), Some(tail))
1789 if !tail.is_empty() =>
1790 {
1791 let Some((scope_id, name)) = tail.split_once(':') else {
1792 if matches!(scope, ChannelScope::Tenant) {
1794 validate_selector_name(tail)?;
1795 return Ok(Self {
1796 scope,
1797 scope_id_pattern: ScopeIdPattern::Current,
1798 name: tail.to_string(),
1799 });
1800 }
1801 return Err(format!(
1802 "channel selector `{input}`: org scope requires `<org-id>:<name>`"
1803 ));
1804 };
1805 if scope_id.is_empty() || name.is_empty() {
1806 return Err(format!(
1807 "channel selector `{input}`: scope id and name must be non-empty"
1808 ));
1809 }
1810 validate_selector_name(name)?;
1811 let pattern = if scope_id == "*" {
1812 ScopeIdPattern::Wildcard
1813 } else {
1814 ScopeIdPattern::Exact(scope_id.to_string())
1815 };
1816 Ok(Self {
1817 scope,
1818 scope_id_pattern: pattern,
1819 name: name.to_string(),
1820 })
1821 }
1822 (Some(scope), _) => Err(format!(
1823 "channel selector `{input}`: {} scope requires `<name>` segment",
1824 scope.as_str()
1825 )),
1826 }
1827 }
1828
1829 pub fn scope(&self) -> &'static str {
1830 self.scope.as_str()
1831 }
1832
1833 pub fn name(&self) -> &str {
1834 &self.name
1835 }
1836
1837 pub fn matches(&self, scope: &str, scope_id: &str, name: &str, current_tenant: &str) -> bool {
1841 if self.scope.as_str() != scope || self.name != name {
1842 return false;
1843 }
1844 match &self.scope_id_pattern {
1845 ScopeIdPattern::Current => match self.scope {
1846 ChannelScope::Tenant => scope_id == current_tenant,
1847 ChannelScope::Session | ChannelScope::Pipeline => {
1848 true
1853 }
1854 ChannelScope::Org => false,
1855 },
1856 ScopeIdPattern::Exact(value) => scope_id == value,
1857 ScopeIdPattern::Wildcard => match self.scope {
1858 ChannelScope::Tenant => true,
1859 _ => false,
1861 },
1862 }
1863 }
1864}
1865
1866fn validate_selector_name(name: &str) -> Result<(), String> {
1867 if name.trim().is_empty()
1868 || name.contains(':')
1869 || name.chars().any(|ch| ch.is_control() || ch.is_whitespace())
1870 {
1871 return Err(format!("channel selector name `{name}` is malformed"));
1872 }
1873 Ok(())
1874}
1875
1876async fn dispatch_channel_emit_to_triggers(
1890 ctx: Option<&crate::vm::AsyncBuiltinCtx>,
1891 resolved: &ResolvedChannel,
1892 payload: ChannelEventPayload,
1893 emit_link: Option<crate::tracing::SpanLink>,
1894) -> Result<(), VmError> {
1895 let bindings = crate::triggers::registry::channel_bindings_matching(
1898 resolved.scope.as_str(),
1899 &resolved.scope_id,
1900 &payload.name,
1901 );
1902
1903 flush_expired_aggregations_inner(ctx).await;
1910
1911 if bindings.is_empty() {
1912 return Ok(());
1913 }
1914 let Some(base_vm) = ctx.map(crate::vm::AsyncBuiltinCtx::child_vm) else {
1915 return Ok(());
1917 };
1918 let log = active_event_log()
1919 .unwrap_or_else(|| install_memory_for_current_thread(CHANNEL_QUEUE_DEPTH));
1920 let dispatcher = crate::triggers::Dispatcher::with_event_log(base_vm, log);
1921 for binding in bindings {
1922 if let Some(filter_str) = binding.filter.as_ref() {
1926 if !channel_filter_matches(filter_str, &payload.payload) {
1927 continue;
1928 }
1929 }
1930 let event = build_channel_trigger_event(&payload, emit_link.as_ref());
1931
1932 if let Some(aggregation_config) = binding.aggregation.as_ref() {
1936 let partition_key = crate::triggers::aggregation::partition_key_for_event(
1937 aggregation_config,
1938 &payload.payload,
1939 );
1940 let binding_key = binding.binding_key();
1941 let outcome = crate::triggers::aggregation::accumulate(
1942 &binding_key,
1943 aggregation_config,
1944 partition_key.as_deref(),
1945 event,
1946 );
1947 if let crate::triggers::aggregation::AccumulateOutcome::Ready(events) = outcome {
1948 let links = emit_links_from_batch(&events);
1952 let batch_summary = batch_summary_for_transcript(&events);
1953 let batched = match crate::triggers::dispatcher::build_batched_event_public(events)
1954 {
1955 Ok(batched) => batched,
1956 Err(error) => {
1957 return Err(VmError::Runtime(format!(
1958 "emit_channel aggregation batch: {error}"
1959 )));
1960 }
1961 };
1962 fire_channel_match(
1963 &dispatcher,
1964 binding.clone(),
1965 batched,
1966 resolved,
1967 links,
1968 Some(batch_summary),
1969 )
1970 .await;
1971 }
1972 continue;
1973 }
1974
1975 let links = emit_links_from_event(&event);
1976 fire_channel_match(&dispatcher, binding.clone(), event, resolved, links, None).await;
1977 }
1978 Ok(())
1979}
1980
1981async fn fire_channel_match(
1986 dispatcher: &crate::triggers::Dispatcher,
1987 binding: std::sync::Arc<crate::triggers::registry::TriggerBinding>,
1988 event: TriggerEvent,
1989 resolved: &ResolvedChannel,
1990 links: Vec<crate::tracing::SpanLink>,
1991 batch_summary: Option<serde_json::Value>,
1992) {
1993 let trigger_id = binding.id.as_str().to_string();
1994 let handler_kind = binding.handler.kind().to_string();
1995 let event_id = if event.dedupe_key.is_empty() {
2001 event.id.0.clone()
2002 } else {
2003 event.dedupe_key.clone()
2004 };
2005 let mut match_span = ChannelSpanGuard::start_detached(
2006 crate::tracing::SpanKind::ChannelMatch,
2007 format!("channel.match {}", resolved.resolved_name),
2008 links,
2009 );
2010 match_span.set_metadata("event_id", serde_json::json!(event_id));
2011 match_span.set_metadata("trigger_id", serde_json::json!(trigger_id));
2012 match_span.set_metadata("handler_kind", serde_json::json!(handler_kind));
2013 match_span.set_metadata("name_resolved", serde_json::json!(resolved.resolved_name));
2014 if let Some(summary) = batch_summary.as_ref() {
2015 match_span.set_metadata("batch", summary.clone());
2016 }
2017 let span_id = crate::tracing::current_span_id().unwrap_or(0);
2018 let matched_at_ms = harn_clock::offset_datetime_to_ms(crate::clock_mock::now_utc());
2019 let matched_in_session_id = crate::agent_sessions::current_session_id()
2020 .or_else(|| event.tenant_id.as_ref().map(|t| t.0.clone()));
2021 emit_channel_match_transcript(
2022 &trigger_id,
2023 &handler_kind,
2024 resolved,
2025 &event_id,
2026 matched_at_ms,
2027 matched_in_session_id.as_deref(),
2028 span_id,
2029 batch_summary.clone(),
2030 );
2031 let dispatch_outcome = dispatcher.dispatch(&binding, event).await;
2037 let binding_key = binding.binding_key();
2038 let batch_info = batch_info_from_summary(batch_summary.as_ref());
2039 record_channel_match_receipt(
2040 &trigger_id,
2041 &binding_key,
2042 &handler_kind,
2043 resolved,
2044 &event_id,
2045 matched_in_session_id.as_deref(),
2046 batch_info,
2047 span_id,
2048 &dispatch_outcome,
2049 )
2050 .await;
2051 drop(dispatch_outcome);
2056 match_span.end();
2057}
2058
2059pub(crate) async fn flush_expired_aggregations_inner(ctx: Option<&crate::vm::AsyncBuiltinCtx>) {
2064 let expirations = crate::triggers::aggregation::drain_expired_aggregations();
2065 if expirations.is_empty() {
2066 return;
2067 }
2068 let Some(base_vm) = ctx.map(crate::vm::AsyncBuiltinCtx::child_vm) else {
2069 return;
2070 };
2071 let log = active_event_log()
2072 .unwrap_or_else(|| install_memory_for_current_thread(CHANNEL_QUEUE_DEPTH));
2073 let dispatcher = crate::triggers::Dispatcher::with_event_log(base_vm, log);
2074 for expired in expirations {
2075 if matches!(
2076 expired.action,
2077 crate::triggers::aggregation::ExpireAction::Discard
2078 ) {
2079 continue;
2080 }
2081 let Some((trigger_id, version_str)) = expired.binding_key.rsplit_once("@v") else {
2084 continue;
2085 };
2086 let Ok(version) = version_str.parse::<u32>() else {
2087 continue;
2088 };
2089 let Ok(binding) =
2090 crate::triggers::registry::resolve_live_trigger_binding(trigger_id, Some(version))
2091 else {
2092 continue;
2093 };
2094 let resolved_for_match = resolved_from_first_event(&expired.events);
2098 let links = emit_links_from_batch(&expired.events);
2099 let batch_summary = batch_summary_for_transcript(&expired.events);
2100 let batched = match crate::triggers::dispatcher::build_batched_event_public(expired.events)
2101 {
2102 Ok(batched) => batched,
2103 Err(_) => continue,
2104 };
2105 match resolved_for_match {
2106 Some(resolved) => {
2107 fire_channel_match(
2108 &dispatcher,
2109 binding,
2110 batched,
2111 &resolved,
2112 links,
2113 Some(batch_summary),
2114 )
2115 .await;
2116 }
2117 None => {
2118 let _ = dispatcher.dispatch(&binding, batched).await;
2119 }
2120 }
2121 }
2122}
2123
2124fn resolved_from_first_event(events: &[TriggerEvent]) -> Option<ResolvedChannel> {
2130 let first = events.first()?;
2131 let ProviderPayload::Known(KnownProviderPayload::Channel(payload)) = &first.provider_payload
2132 else {
2133 return None;
2134 };
2135 let scope = ChannelScope::parse(&payload.scope).ok()?;
2136 let topic = Topic::new(format!(
2137 "channels.{}.{}.{}",
2138 payload.scope,
2139 sanitize_topic_component(&payload.scope_id),
2140 sanitize_topic_component(&payload.name),
2141 ))
2142 .ok()?;
2143 Some(ResolvedChannel {
2144 scope,
2145 scope_id: payload.scope_id.clone(),
2146 resolved_name: payload.name_resolved.clone(),
2147 topic,
2148 retention: retention_for_scope(scope),
2149 })
2150}
2151
2152fn build_channel_trigger_event(
2153 payload: &ChannelEventPayload,
2154 emit_link: Option<&crate::tracing::SpanLink>,
2155) -> TriggerEvent {
2156 let mut event = TriggerEvent::new(
2157 ProviderId::from("channel"),
2158 "channel.emit",
2159 None,
2160 payload.id.clone(),
2161 payload.tenant_id.clone().map(TenantId::new),
2162 BTreeMap::new(),
2163 ProviderPayload::Known(KnownProviderPayload::Channel(payload.clone())),
2164 SignatureStatus::Unsigned,
2165 );
2166 event.headers.insert(
2167 "harn_channel_name".to_string(),
2168 payload.name_resolved.clone(),
2169 );
2170 event
2171 .headers
2172 .insert("harn_channel_scope".to_string(), payload.scope.clone());
2173 event.headers.insert(
2174 "harn_channel_scope_id".to_string(),
2175 payload.scope_id.clone(),
2176 );
2177 if let Some(link) = emit_link {
2182 event
2183 .headers
2184 .insert(EMIT_TRACE_ID_HEADER.to_string(), link.trace_id.clone());
2185 event
2186 .headers
2187 .insert(EMIT_SPAN_ID_HEADER.to_string(), link.span_id.clone());
2188 }
2189 event
2190}
2191
2192fn channel_filter_matches(filter_raw: &str, payload: &serde_json::Value) -> bool {
2200 let trimmed = filter_raw.trim();
2201 if trimmed.is_empty() {
2202 return true;
2203 }
2204 let parsed: serde_json::Value = match serde_json::from_str(trimmed) {
2205 Ok(value) => value,
2206 Err(_) => return true,
2207 };
2208 let Some(map) = parsed.as_object() else {
2209 return true;
2210 };
2211 map.iter()
2212 .all(|(key, expected)| match payload_path(payload, key) {
2213 Some(actual) => actual == expected,
2214 None => false,
2215 })
2216}
2217
2218fn payload_path<'a>(value: &'a serde_json::Value, path: &str) -> Option<&'a serde_json::Value> {
2219 let mut current = value;
2220 for segment in path.split('.') {
2221 if segment.is_empty() {
2222 return None;
2223 }
2224 current = match current {
2225 serde_json::Value::Object(map) => map.get(segment)?,
2226 _ => return None,
2227 };
2228 }
2229 Some(current)
2230}
2231
2232#[cfg(test)]
2233mod tests;