1use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex, OnceLock};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25 registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26 ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod a2a_push;
30pub mod cron;
31pub mod effect_policy;
32pub mod github;
33pub mod harn_module;
34pub mod hmac;
35pub mod linear;
36pub mod notion;
37pub mod slack;
38pub mod stream;
39#[cfg(test)]
40pub(crate) mod test_util;
41pub mod testkit;
42pub mod webhook;
43
44pub use a2a_push::A2aPushConnector;
45pub use cron::{CatchupMode, CronConnector};
46pub use effect_policy::{
47 connector_export_denied_builtin_reason, connector_export_effect_class,
48 default_connector_export_policy, ConnectorExportEffectClass, HarnConnectorEffectPolicies,
49};
50pub use github::GitHubConnector;
51pub use harn_module::{
52 load_contract as load_harn_connector_contract, HarnConnector, HarnConnectorContract,
53};
54pub use hmac::{
55 verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
56 DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
57 DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
58 DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
59 DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
60 DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
61 SIGNATURE_VERIFY_AUDIT_TOPIC,
62};
63pub use linear::LinearConnector;
64pub use notion::{
65 load_pending_webhook_handshakes, NotionConnector, PersistedNotionWebhookHandshake,
66};
67pub use slack::SlackConnector;
68pub use stream::StreamConnector;
69use webhook::WebhookProviderProfile;
70pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
71
72const OUTBOUND_CONNECTOR_HTTP_TIMEOUT: StdDuration = StdDuration::from_secs(30);
73
74pub(crate) fn outbound_http_client(user_agent: &'static str) -> reqwest::Client {
75 reqwest::Client::builder()
76 .user_agent(user_agent)
77 .timeout(OUTBOUND_CONNECTOR_HTTP_TIMEOUT)
78 .build()
79 .expect("connector HTTP client configuration should be valid")
80}
81
82pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
84
85thread_local! {
86 static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
87 RefCell::new(BTreeMap::new());
88}
89
90#[async_trait]
92pub trait Connector: Send + Sync {
93 fn provider_id(&self) -> &ProviderId;
95
96 fn kinds(&self) -> &[TriggerKind];
98
99 async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
101
102 async fn activate(
104 &self,
105 bindings: &[TriggerBinding],
106 ) -> Result<ActivationHandle, ConnectorError>;
107
108 async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
110 Ok(())
111 }
112
113 async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
115
116 async fn normalize_inbound_result(
119 &self,
120 raw: RawInbound,
121 ) -> Result<ConnectorNormalizeResult, ConnectorError> {
122 self.normalize_inbound(raw)
123 .await
124 .map(ConnectorNormalizeResult::event)
125 }
126
127 fn payload_schema(&self) -> ProviderPayloadSchema;
129
130 fn client(&self) -> Arc<dyn ConnectorClient>;
132}
133
134#[derive(Clone, Debug, PartialEq)]
136pub struct ConnectorHttpResponse {
137 pub status: u16,
138 pub headers: BTreeMap<String, String>,
139 pub body: JsonValue,
140}
141
142impl ConnectorHttpResponse {
143 pub fn new(status: u16, headers: BTreeMap<String, String>, body: JsonValue) -> Self {
144 Self {
145 status,
146 headers,
147 body,
148 }
149 }
150}
151
152#[derive(Clone, Debug, PartialEq)]
154pub enum ConnectorNormalizeResult {
155 Event(Box<TriggerEvent>),
156 Batch(Vec<TriggerEvent>),
157 ImmediateResponse {
158 response: ConnectorHttpResponse,
159 events: Vec<TriggerEvent>,
160 },
161 Reject(ConnectorHttpResponse),
162}
163
164impl ConnectorNormalizeResult {
165 pub fn event(event: TriggerEvent) -> Self {
166 Self::Event(Box::new(event))
167 }
168
169 pub fn into_events(self) -> Vec<TriggerEvent> {
170 match self {
171 Self::Event(event) => vec![*event],
172 Self::Batch(events) | Self::ImmediateResponse { events, .. } => events,
173 Self::Reject(_) => Vec::new(),
174 }
175 }
176}
177
178#[derive(Clone, Debug, PartialEq)]
179pub enum PostNormalizeOutcome {
180 Ready(Box<TriggerEvent>),
181 DuplicateDropped,
182}
183
184pub async fn postprocess_normalized_event(
185 inbox: &InboxIndex,
186 binding_id: &str,
187 dedupe_enabled: bool,
188 dedupe_ttl: StdDuration,
189 mut event: TriggerEvent,
190) -> Result<PostNormalizeOutcome, ConnectorError> {
191 if dedupe_enabled && !event.dedupe_claimed() {
192 if !inbox
193 .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
194 .await?
195 {
196 return Ok(PostNormalizeOutcome::DuplicateDropped);
197 }
198 event.mark_dedupe_claimed();
199 }
200
201 Ok(PostNormalizeOutcome::Ready(Box::new(event)))
202}
203
204#[async_trait]
206pub trait ConnectorClient: Send + Sync {
207 async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
208}
209
210#[derive(Clone, Debug, PartialEq, Eq)]
212pub enum ClientError {
213 MethodNotFound(String),
214 InvalidArgs(String),
215 RateLimited(String),
216 Transport(String),
217 Other(String),
218}
219
220impl fmt::Display for ClientError {
221 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
222 match self {
223 Self::MethodNotFound(message)
224 | Self::InvalidArgs(message)
225 | Self::RateLimited(message)
226 | Self::Transport(message)
227 | Self::Other(message) => message.fmt(f),
228 }
229 }
230}
231
232impl std::error::Error for ClientError {}
233
234#[derive(Debug)]
236pub enum ConnectorError {
237 DuplicateProvider(String),
238 DuplicateDelivery(String),
239 UnknownProvider(String),
240 MissingHeader(String),
241 InvalidHeader {
242 name: String,
243 detail: String,
244 },
245 InvalidSignature(String),
246 TimestampOutOfWindow {
247 timestamp: OffsetDateTime,
248 now: OffsetDateTime,
249 window: time::Duration,
250 },
251 Json(String),
252 Secret(String),
253 EventLog(String),
254 HarnRuntime(String),
255 Client(ClientError),
256 Unsupported(String),
257 Activation(String),
258}
259
260impl ConnectorError {
261 pub fn invalid_signature(message: impl Into<String>) -> Self {
262 Self::InvalidSignature(message.into())
263 }
264}
265
266impl fmt::Display for ConnectorError {
267 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
268 match self {
269 Self::DuplicateProvider(provider) => {
270 write!(f, "connector provider `{provider}` is already registered")
271 }
272 Self::DuplicateDelivery(message) => message.fmt(f),
273 Self::UnknownProvider(provider) => {
274 write!(f, "connector provider `{provider}` is not registered")
275 }
276 Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
277 Self::InvalidHeader { name, detail } => {
278 write!(f, "invalid header `{name}`: {detail}")
279 }
280 Self::InvalidSignature(message)
281 | Self::Json(message)
282 | Self::Secret(message)
283 | Self::EventLog(message)
284 | Self::HarnRuntime(message)
285 | Self::Unsupported(message)
286 | Self::Activation(message) => message.fmt(f),
287 Self::TimestampOutOfWindow {
288 timestamp,
289 now,
290 window,
291 } => write!(
292 f,
293 "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
294 ),
295 Self::Client(error) => error.fmt(f),
296 }
297 }
298}
299
300impl std::error::Error for ConnectorError {}
301
302impl From<crate::event_log::LogError> for ConnectorError {
303 fn from(value: crate::event_log::LogError) -> Self {
304 Self::EventLog(value.to_string())
305 }
306}
307
308impl From<crate::secrets::SecretError> for ConnectorError {
309 fn from(value: crate::secrets::SecretError) -> Self {
310 Self::Secret(value.to_string())
311 }
312}
313
314impl From<serde_json::Error> for ConnectorError {
315 fn from(value: serde_json::Error) -> Self {
316 Self::Json(value.to_string())
317 }
318}
319
320impl From<ClientError> for ConnectorError {
321 fn from(value: ClientError) -> Self {
322 Self::Client(value)
323 }
324}
325
326#[derive(Clone)]
328pub struct ConnectorCtx {
329 pub event_log: Arc<AnyEventLog>,
330 pub secrets: Arc<dyn SecretProvider>,
331 pub inbox: Arc<InboxIndex>,
332 pub metrics: Arc<MetricsRegistry>,
333 pub rate_limiter: Arc<RateLimiterFactory>,
334}
335
336#[derive(Clone, Debug, Default, PartialEq, Eq)]
338pub struct ConnectorMetricsSnapshot {
339 pub inbox_claims_written: u64,
340 pub inbox_duplicates_rejected: u64,
341 pub inbox_fast_path_hits: u64,
342 pub inbox_durable_hits: u64,
343 pub inbox_expired_entries: u64,
344 pub inbox_active_entries: u64,
345 pub linear_timestamp_rejections_total: u64,
346 pub dispatch_succeeded_total: u64,
347 pub dispatch_failed_total: u64,
348 pub retry_scheduled_total: u64,
349 pub slack_delivery_success_total: u64,
350 pub slack_delivery_failure_total: u64,
351}
352
353type MetricLabels = BTreeMap<String, String>;
354
355#[derive(Clone, Debug, Default, PartialEq)]
356struct HistogramMetric {
357 buckets: BTreeMap<String, u64>,
358 count: u64,
359 sum: f64,
360}
361
362static ACTIVE_METRICS_REGISTRY: OnceLock<Mutex<Option<Arc<MetricsRegistry>>>> = OnceLock::new();
363
364pub fn install_active_metrics_registry(metrics: Arc<MetricsRegistry>) {
365 let slot = ACTIVE_METRICS_REGISTRY.get_or_init(|| Mutex::new(None));
366 *slot.lock().expect("active metrics registry poisoned") = Some(metrics);
367}
368
369pub fn clear_active_metrics_registry() {
370 if let Some(slot) = ACTIVE_METRICS_REGISTRY.get() {
371 *slot.lock().expect("active metrics registry poisoned") = None;
372 }
373}
374
375pub fn active_metrics_registry() -> Option<Arc<MetricsRegistry>> {
376 ACTIVE_METRICS_REGISTRY.get().and_then(|slot| {
377 slot.lock()
378 .expect("active metrics registry poisoned")
379 .clone()
380 })
381}
382
383#[derive(Debug, Default)]
385pub struct MetricsRegistry {
386 inbox_claims_written: AtomicU64,
387 inbox_duplicates_rejected: AtomicU64,
388 inbox_fast_path_hits: AtomicU64,
389 inbox_durable_hits: AtomicU64,
390 inbox_expired_entries: AtomicU64,
391 inbox_active_entries: AtomicU64,
392 linear_timestamp_rejections_total: AtomicU64,
393 dispatch_succeeded_total: AtomicU64,
394 dispatch_failed_total: AtomicU64,
395 retry_scheduled_total: AtomicU64,
396 slack_delivery_success_total: AtomicU64,
397 slack_delivery_failure_total: AtomicU64,
398 custom_counters: Mutex<BTreeMap<String, u64>>,
399 counters: Mutex<BTreeMap<(String, MetricLabels), f64>>,
400 gauges: Mutex<BTreeMap<(String, MetricLabels), f64>>,
401 histograms: Mutex<BTreeMap<(String, MetricLabels), HistogramMetric>>,
402 pending_trigger_events: Mutex<BTreeMap<MetricLabels, BTreeMap<String, i64>>>,
403}
404
405impl MetricsRegistry {
406 const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0];
407 const TRIGGER_LATENCY_BUCKETS: [f64; 15] = [
408 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0,
409 ];
410 const SIZE_BUCKETS: [f64; 9] = [
411 128.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0, 10485760.0,
412 ];
413
414 pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
415 ConnectorMetricsSnapshot {
416 inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
417 inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
418 inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
419 inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
420 inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
421 inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
422 linear_timestamp_rejections_total: self
423 .linear_timestamp_rejections_total
424 .load(Ordering::Relaxed),
425 dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
426 dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
427 retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
428 slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
429 slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
430 }
431 }
432
433 pub(crate) fn record_inbox_claim(&self) {
434 self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
435 }
436
437 pub(crate) fn record_inbox_duplicate_fast_path(&self) {
438 self.inbox_duplicates_rejected
439 .fetch_add(1, Ordering::Relaxed);
440 self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
441 }
442
443 pub(crate) fn record_inbox_duplicate_durable(&self) {
444 self.inbox_duplicates_rejected
445 .fetch_add(1, Ordering::Relaxed);
446 self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
447 }
448
449 pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
450 if count > 0 {
451 self.inbox_expired_entries
452 .fetch_add(count, Ordering::Relaxed);
453 }
454 }
455
456 pub(crate) fn set_inbox_active_entries(&self, count: usize) {
457 self.inbox_active_entries
458 .store(count as u64, Ordering::Relaxed);
459 }
460
461 pub fn record_linear_timestamp_rejection(&self) {
462 self.linear_timestamp_rejections_total
463 .fetch_add(1, Ordering::Relaxed);
464 }
465
466 pub fn record_dispatch_succeeded(&self) {
467 self.dispatch_succeeded_total
468 .fetch_add(1, Ordering::Relaxed);
469 }
470
471 pub fn record_dispatch_failed(&self) {
472 self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
473 }
474
475 pub fn record_retry_scheduled(&self) {
476 self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
477 }
478
479 pub fn record_slack_delivery_success(&self) {
480 self.slack_delivery_success_total
481 .fetch_add(1, Ordering::Relaxed);
482 }
483
484 pub fn record_slack_delivery_failure(&self) {
485 self.slack_delivery_failure_total
486 .fetch_add(1, Ordering::Relaxed);
487 }
488
489 pub fn record_custom_counter(&self, name: &str, amount: u64) {
490 if amount == 0 {
491 return;
492 }
493 let mut counters = self
494 .custom_counters
495 .lock()
496 .expect("custom counters poisoned");
497 *counters.entry(name.to_string()).or_default() += amount;
498 }
499
500 pub fn record_http_request(
501 &self,
502 endpoint: &str,
503 method: &str,
504 status: u16,
505 duration: StdDuration,
506 body_size_bytes: usize,
507 ) {
508 self.increment_counter(
509 "harn_http_requests_total",
510 labels([
511 ("endpoint", endpoint),
512 ("method", method),
513 ("status", &status.to_string()),
514 ]),
515 1,
516 );
517 self.observe_histogram(
518 "harn_http_request_duration_seconds",
519 labels([("endpoint", endpoint)]),
520 duration.as_secs_f64(),
521 &Self::DURATION_BUCKETS,
522 );
523 self.observe_histogram(
524 "harn_http_body_size_bytes",
525 labels([("endpoint", endpoint)]),
526 body_size_bytes as f64,
527 &Self::SIZE_BUCKETS,
528 );
529 }
530
531 pub fn record_trigger_received(&self, trigger_id: &str, provider: &str) {
532 self.increment_counter(
533 "harn_trigger_received_total",
534 labels([("trigger_id", trigger_id), ("provider", provider)]),
535 1,
536 );
537 }
538
539 pub fn record_trigger_deduped(&self, trigger_id: &str, reason: &str) {
540 self.increment_counter(
541 "harn_trigger_deduped_total",
542 labels([("trigger_id", trigger_id), ("reason", reason)]),
543 1,
544 );
545 }
546
547 pub fn record_trigger_predicate_evaluation(
548 &self,
549 trigger_id: &str,
550 result: bool,
551 cost_usd: f64,
552 ) {
553 self.increment_counter(
554 "harn_trigger_predicate_evaluations_total",
555 labels([
556 ("trigger_id", trigger_id),
557 ("result", if result { "true" } else { "false" }),
558 ]),
559 1,
560 );
561 self.observe_histogram(
562 "harn_trigger_predicate_cost_usd",
563 labels([("trigger_id", trigger_id)]),
564 cost_usd.max(0.0),
565 &[0.0, 0.001, 0.01, 0.05, 0.1, 1.0],
566 );
567 }
568
569 pub fn record_trigger_dispatched(&self, trigger_id: &str, handler_kind: &str, outcome: &str) {
570 self.increment_counter(
571 "harn_trigger_dispatched_total",
572 labels([
573 ("trigger_id", trigger_id),
574 ("handler_kind", handler_kind),
575 ("outcome", outcome),
576 ]),
577 1,
578 );
579 }
580
581 pub fn record_trigger_retry(&self, trigger_id: &str, attempt: u32) {
582 self.increment_counter(
583 "harn_trigger_retries_total",
584 labels([
585 ("trigger_id", trigger_id),
586 ("attempt", &attempt.to_string()),
587 ]),
588 1,
589 );
590 }
591
592 pub fn record_trigger_dlq(&self, trigger_id: &str, reason: &str) {
593 self.increment_counter(
594 "harn_trigger_dlq_total",
595 labels([("trigger_id", trigger_id), ("reason", reason)]),
596 1,
597 );
598 }
599
600 pub fn record_trigger_accepted_to_normalized(
601 &self,
602 trigger_id: &str,
603 binding_key: &str,
604 provider: &str,
605 tenant_id: Option<&str>,
606 status: &str,
607 duration: StdDuration,
608 ) {
609 self.observe_histogram(
610 "harn_trigger_webhook_accepted_to_normalized_seconds",
611 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
612 duration.as_secs_f64(),
613 &Self::TRIGGER_LATENCY_BUCKETS,
614 );
615 }
616
617 pub fn record_trigger_accepted_to_queue_append(
618 &self,
619 trigger_id: &str,
620 binding_key: &str,
621 provider: &str,
622 tenant_id: Option<&str>,
623 status: &str,
624 duration: StdDuration,
625 ) {
626 self.observe_histogram(
627 "harn_trigger_webhook_accepted_to_queue_append_seconds",
628 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
629 duration.as_secs_f64(),
630 &Self::TRIGGER_LATENCY_BUCKETS,
631 );
632 }
633
634 pub fn record_trigger_queue_age_at_dispatch_admission(
635 &self,
636 trigger_id: &str,
637 binding_key: &str,
638 provider: &str,
639 tenant_id: Option<&str>,
640 status: &str,
641 age: StdDuration,
642 ) {
643 self.observe_histogram(
644 "harn_trigger_queue_age_at_dispatch_admission_seconds",
645 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
646 age.as_secs_f64(),
647 &Self::TRIGGER_LATENCY_BUCKETS,
648 );
649 }
650
651 pub fn record_trigger_queue_age_at_dispatch_start(
652 &self,
653 trigger_id: &str,
654 binding_key: &str,
655 provider: &str,
656 tenant_id: Option<&str>,
657 status: &str,
658 age: StdDuration,
659 ) {
660 self.observe_histogram(
661 "harn_trigger_queue_age_at_dispatch_start_seconds",
662 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
663 age.as_secs_f64(),
664 &Self::TRIGGER_LATENCY_BUCKETS,
665 );
666 }
667
668 pub fn record_trigger_dispatch_runtime(
669 &self,
670 trigger_id: &str,
671 binding_key: &str,
672 provider: &str,
673 tenant_id: Option<&str>,
674 status: &str,
675 duration: StdDuration,
676 ) {
677 self.observe_histogram(
678 "harn_trigger_dispatch_runtime_seconds",
679 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
680 duration.as_secs_f64(),
681 &Self::TRIGGER_LATENCY_BUCKETS,
682 );
683 }
684
685 pub fn record_trigger_retry_delay(
686 &self,
687 trigger_id: &str,
688 binding_key: &str,
689 provider: &str,
690 tenant_id: Option<&str>,
691 status: &str,
692 duration: StdDuration,
693 ) {
694 self.observe_histogram(
695 "harn_trigger_retry_delay_seconds",
696 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
697 duration.as_secs_f64(),
698 &Self::TRIGGER_LATENCY_BUCKETS,
699 );
700 }
701
702 pub fn record_trigger_accepted_to_dlq(
703 &self,
704 trigger_id: &str,
705 binding_key: &str,
706 provider: &str,
707 tenant_id: Option<&str>,
708 status: &str,
709 duration: StdDuration,
710 ) {
711 self.observe_histogram(
712 "harn_trigger_accepted_to_dlq_seconds",
713 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
714 duration.as_secs_f64(),
715 &Self::TRIGGER_LATENCY_BUCKETS,
716 );
717 }
718
719 pub fn note_trigger_pending_event(
720 &self,
721 event_id: &str,
722 trigger_id: &str,
723 binding_key: &str,
724 provider: &str,
725 tenant_id: Option<&str>,
726 accepted_at_ms: i64,
727 now_ms: i64,
728 ) {
729 let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
730 {
731 let mut pending = self
732 .pending_trigger_events
733 .lock()
734 .expect("pending trigger events poisoned");
735 pending
736 .entry(labels.clone())
737 .or_default()
738 .insert(event_id.to_string(), accepted_at_ms);
739 }
740 self.refresh_oldest_pending_gauge(labels, now_ms);
741 }
742
743 pub fn clear_trigger_pending_event(
744 &self,
745 event_id: &str,
746 trigger_id: &str,
747 binding_key: &str,
748 provider: &str,
749 tenant_id: Option<&str>,
750 now_ms: i64,
751 ) {
752 let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
753 {
754 let mut pending = self
755 .pending_trigger_events
756 .lock()
757 .expect("pending trigger events poisoned");
758 if let Some(events) = pending.get_mut(&labels) {
759 events.remove(event_id);
760 if events.is_empty() {
761 pending.remove(&labels);
762 }
763 }
764 }
765 self.refresh_oldest_pending_gauge(labels, now_ms);
766 }
767
768 pub fn set_trigger_inflight(&self, trigger_id: &str, count: u64) {
769 self.set_gauge(
770 "harn_trigger_inflight",
771 labels([("trigger_id", trigger_id)]),
772 count as f64,
773 );
774 }
775
776 pub fn set_trigger_budget_cost_today(&self, trigger_id: &str, cost_usd: f64) {
777 self.set_gauge(
778 "harn_trigger_budget_cost_today_usd",
779 labels([("trigger_id", trigger_id)]),
780 cost_usd.max(0.0),
781 );
782 }
783
784 pub fn record_trigger_budget_exhausted(&self, trigger_id: &str, strategy: &str) {
785 self.increment_counter(
786 "harn_trigger_budget_exhausted_total",
787 labels([("trigger_id", trigger_id), ("strategy", strategy)]),
788 1,
789 );
790 }
791
792 pub fn record_backpressure_event(&self, dimension: &str, action: &str) {
793 self.increment_counter(
794 "harn_backpressure_events_total",
795 labels([("dimension", dimension), ("action", action)]),
796 1,
797 );
798 }
799
800 pub fn record_event_log_append(
801 &self,
802 topic: &str,
803 duration: StdDuration,
804 payload_bytes: usize,
805 ) {
806 self.observe_histogram(
807 "harn_event_log_append_duration_seconds",
808 labels([("topic", topic)]),
809 duration.as_secs_f64(),
810 &Self::DURATION_BUCKETS,
811 );
812 self.set_gauge(
813 "harn_event_log_topic_size_bytes",
814 labels([("topic", topic)]),
815 payload_bytes as f64,
816 );
817 }
818
819 pub fn set_event_log_consumer_lag(&self, topic: &str, consumer: &str, lag: u64) {
820 self.set_gauge(
821 "harn_event_log_consumer_lag",
822 labels([("topic", topic), ("consumer", consumer)]),
823 lag as f64,
824 );
825 }
826
827 pub fn record_a2a_hop(&self, target: &str, outcome: &str, duration: StdDuration) {
828 self.increment_counter(
829 "harn_a2a_hops_total",
830 labels([("target", target), ("outcome", outcome)]),
831 1,
832 );
833 self.observe_histogram(
834 "harn_a2a_hop_duration_seconds",
835 labels([("target", target)]),
836 duration.as_secs_f64(),
837 &Self::DURATION_BUCKETS,
838 );
839 }
840
841 pub fn set_worker_queue_depth(&self, queue: &str, depth: u64) {
842 self.set_gauge(
843 "harn_worker_queue_depth",
844 labels([("queue", queue)]),
845 depth as f64,
846 );
847 }
848
849 pub fn record_worker_queue_claim_age(&self, queue: &str, age_seconds: f64) {
850 self.observe_histogram(
851 "harn_worker_queue_claim_age_seconds",
852 labels([("queue", queue)]),
853 age_seconds.max(0.0),
854 &Self::DURATION_BUCKETS,
855 );
856 }
857
858 pub fn record_scheduler_selection(
860 &self,
861 queue: &str,
862 fairness_dimension: &str,
863 fairness_key: &str,
864 ) {
865 self.increment_counter(
866 "harn_scheduler_selections_total",
867 labels([
868 ("queue", queue),
869 ("fairness_dimension", fairness_dimension),
870 ("fairness_key", fairness_key),
871 ]),
872 1,
873 );
874 }
875
876 pub fn record_scheduler_deferral(
879 &self,
880 queue: &str,
881 fairness_dimension: &str,
882 fairness_key: &str,
883 ) {
884 self.increment_counter(
885 "harn_scheduler_deferrals_total",
886 labels([
887 ("queue", queue),
888 ("fairness_dimension", fairness_dimension),
889 ("fairness_key", fairness_key),
890 ]),
891 1,
892 );
893 }
894
895 pub fn record_scheduler_starvation_promotion(
897 &self,
898 queue: &str,
899 fairness_dimension: &str,
900 fairness_key: &str,
901 ) {
902 self.increment_counter(
903 "harn_scheduler_starvation_promotions_total",
904 labels([
905 ("queue", queue),
906 ("fairness_dimension", fairness_dimension),
907 ("fairness_key", fairness_key),
908 ]),
909 1,
910 );
911 }
912
913 pub fn set_scheduler_deficit(
915 &self,
916 queue: &str,
917 fairness_dimension: &str,
918 fairness_key: &str,
919 deficit: i64,
920 ) {
921 self.set_gauge(
922 "harn_scheduler_deficit",
923 labels([
924 ("queue", queue),
925 ("fairness_dimension", fairness_dimension),
926 ("fairness_key", fairness_key),
927 ]),
928 deficit as f64,
929 );
930 }
931
932 pub fn set_scheduler_oldest_eligible_age(
934 &self,
935 queue: &str,
936 fairness_dimension: &str,
937 fairness_key: &str,
938 age_ms: u64,
939 ) {
940 self.set_gauge(
941 "harn_scheduler_oldest_eligible_age_seconds",
942 labels([
943 ("queue", queue),
944 ("fairness_dimension", fairness_dimension),
945 ("fairness_key", fairness_key),
946 ]),
947 age_ms as f64 / 1000.0,
948 );
949 }
950
951 pub fn set_orchestrator_pump_backlog(&self, topic: &str, count: u64) {
952 self.set_gauge(
953 "harn_orchestrator_pump_backlog",
954 labels([("topic", topic)]),
955 count as f64,
956 );
957 }
958
959 pub fn set_orchestrator_pump_outstanding(&self, topic: &str, count: usize) {
960 self.set_gauge(
961 "harn_orchestrator_pump_outstanding",
962 labels([("topic", topic)]),
963 count as f64,
964 );
965 }
966
967 pub fn record_orchestrator_pump_admission_delay(&self, topic: &str, duration: StdDuration) {
968 self.observe_histogram(
969 "harn_orchestrator_pump_admission_delay_seconds",
970 labels([("topic", topic)]),
971 duration.as_secs_f64(),
972 &Self::DURATION_BUCKETS,
973 );
974 }
975
976 pub fn record_llm_call(&self, provider: &str, model: &str, outcome: &str, cost_usd: f64) {
977 self.increment_counter(
978 "harn_llm_calls_total",
979 labels([
980 ("provider", provider),
981 ("model", model),
982 ("outcome", outcome),
983 ]),
984 1,
985 );
986 if cost_usd > 0.0 {
987 self.increment_counter(
988 "harn_llm_cost_usd_total",
989 labels([("provider", provider), ("model", model)]),
990 cost_usd,
991 );
992 } else {
993 self.ensure_counter(
994 "harn_llm_cost_usd_total",
995 labels([("provider", provider), ("model", model)]),
996 );
997 }
998 }
999
1000 pub fn record_llm_cache_hit(&self, provider: &str) {
1001 self.increment_counter(
1002 "harn_llm_cache_hits_total",
1003 labels([("provider", provider)]),
1004 1,
1005 );
1006 }
1007
1008 pub fn render_prometheus(&self) -> String {
1009 let snapshot = self.snapshot();
1010 let counters = [
1011 (
1012 "connector_linear_timestamp_rejections_total",
1013 snapshot.linear_timestamp_rejections_total,
1014 ),
1015 (
1016 "dispatch_succeeded_total",
1017 snapshot.dispatch_succeeded_total,
1018 ),
1019 ("dispatch_failed_total", snapshot.dispatch_failed_total),
1020 ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
1021 ("retry_scheduled_total", snapshot.retry_scheduled_total),
1022 (
1023 "slack_events_delivery_success_total",
1024 snapshot.slack_delivery_success_total,
1025 ),
1026 (
1027 "slack_events_delivery_failure_total",
1028 snapshot.slack_delivery_failure_total,
1029 ),
1030 ];
1031
1032 let mut rendered = String::new();
1033 for (name, value) in counters {
1034 rendered.push_str("# TYPE ");
1035 rendered.push_str(name);
1036 rendered.push_str(" counter\n");
1037 rendered.push_str(name);
1038 rendered.push(' ');
1039 rendered.push_str(&value.to_string());
1040 rendered.push('\n');
1041 }
1042 let custom_counters = self
1043 .custom_counters
1044 .lock()
1045 .expect("custom counters poisoned");
1046 for (name, value) in custom_counters.iter() {
1047 let metric_name = format!(
1048 "connector_custom_{}_total",
1049 name.chars()
1050 .map(|ch| if ch.is_ascii_alphanumeric() || ch == '_' {
1051 ch
1052 } else {
1053 '_'
1054 })
1055 .collect::<String>()
1056 );
1057 rendered.push_str("# TYPE ");
1058 rendered.push_str(&metric_name);
1059 rendered.push_str(" counter\n");
1060 rendered.push_str(&metric_name);
1061 rendered.push(' ');
1062 rendered.push_str(&value.to_string());
1063 rendered.push('\n');
1064 }
1065 rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
1066 rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
1067 rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
1068 rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
1069 self.render_generic_metrics(&mut rendered);
1070 rendered
1071 }
1072
1073 fn increment_counter(&self, name: &str, labels: MetricLabels, amount: impl Into<f64>) {
1074 let amount = amount.into();
1075 if amount <= 0.0 || !amount.is_finite() {
1076 return;
1077 }
1078 let mut counters = self.counters.lock().expect("metrics counters poisoned");
1079 *counters.entry((name.to_string(), labels)).or_default() += amount;
1080 }
1081
1082 fn ensure_counter(&self, name: &str, labels: MetricLabels) {
1083 let mut counters = self.counters.lock().expect("metrics counters poisoned");
1084 counters.entry((name.to_string(), labels)).or_default();
1085 }
1086
1087 fn set_gauge(&self, name: &str, labels: MetricLabels, value: f64) {
1088 let mut gauges = self.gauges.lock().expect("metrics gauges poisoned");
1089 gauges.insert((name.to_string(), labels), value);
1090 }
1091
1092 fn observe_histogram(
1093 &self,
1094 name: &str,
1095 labels: MetricLabels,
1096 value: f64,
1097 bucket_bounds: &[f64],
1098 ) {
1099 if !value.is_finite() {
1100 return;
1101 }
1102 let mut histograms = self.histograms.lock().expect("metrics histograms poisoned");
1103 let histogram = histograms
1104 .entry((name.to_string(), labels))
1105 .or_insert_with(|| HistogramMetric {
1106 buckets: bucket_bounds
1107 .iter()
1108 .map(|bound| (prometheus_float(*bound), 0))
1109 .chain(std::iter::once(("+Inf".to_string(), 0)))
1110 .collect(),
1111 count: 0,
1112 sum: 0.0,
1113 });
1114 histogram.count += 1;
1115 histogram.sum += value;
1116 for bound in bucket_bounds {
1117 if value <= *bound {
1118 let key = prometheus_float(*bound);
1119 *histogram.buckets.entry(key).or_default() += 1;
1120 }
1121 }
1122 *histogram.buckets.entry("+Inf".to_string()).or_default() += 1;
1123 }
1124
1125 fn refresh_oldest_pending_gauge(&self, labels: MetricLabels, now_ms: i64) {
1126 let oldest_accepted_at_ms = self
1127 .pending_trigger_events
1128 .lock()
1129 .expect("pending trigger events poisoned")
1130 .get(&labels)
1131 .and_then(|events| events.values().min().copied());
1132 let age_seconds = oldest_accepted_at_ms
1133 .map(|accepted_at_ms| millis_delta(now_ms, accepted_at_ms).as_secs_f64())
1134 .unwrap_or(0.0);
1135 self.set_gauge(
1136 "harn_trigger_oldest_pending_age_seconds",
1137 labels,
1138 age_seconds,
1139 );
1140 }
1141
1142 fn render_generic_metrics(&self, rendered: &mut String) {
1143 let counters = self
1144 .counters
1145 .lock()
1146 .expect("metrics counters poisoned")
1147 .clone();
1148 let gauges = self.gauges.lock().expect("metrics gauges poisoned").clone();
1149 let histograms = self
1150 .histograms
1151 .lock()
1152 .expect("metrics histograms poisoned")
1153 .clone();
1154
1155 for name in metric_family_names(MetricKind::Counter) {
1156 rendered.push_str("# TYPE ");
1157 rendered.push_str(name);
1158 rendered.push_str(" counter\n");
1159 for ((sample_name, labels), value) in counters.iter().filter(|((n, _), _)| n == name) {
1160 render_sample(rendered, sample_name, labels, *value);
1161 }
1162 }
1163 for name in metric_family_names(MetricKind::Gauge) {
1164 rendered.push_str("# TYPE ");
1165 rendered.push_str(name);
1166 rendered.push_str(" gauge\n");
1167 for ((sample_name, labels), value) in gauges.iter().filter(|((n, _), _)| n == name) {
1168 render_sample(rendered, sample_name, labels, *value);
1169 }
1170 }
1171 for name in metric_family_names(MetricKind::Histogram) {
1172 rendered.push_str("# TYPE ");
1173 rendered.push_str(name);
1174 rendered.push_str(" histogram\n");
1175 for ((sample_name, labels), histogram) in
1176 histograms.iter().filter(|((n, _), _)| n == name)
1177 {
1178 for (le, value) in &histogram.buckets {
1179 let mut bucket_labels = labels.clone();
1180 bucket_labels.insert("le".to_string(), le.clone());
1181 render_sample(
1182 rendered,
1183 &format!("{sample_name}_bucket"),
1184 &bucket_labels,
1185 *value as f64,
1186 );
1187 }
1188 render_sample(
1189 rendered,
1190 &format!("{sample_name}_sum"),
1191 labels,
1192 histogram.sum,
1193 );
1194 render_sample(
1195 rendered,
1196 &format!("{sample_name}_count"),
1197 labels,
1198 histogram.count as f64,
1199 );
1200 }
1201 }
1202 }
1203}
1204
1205#[derive(Clone, Copy)]
1206enum MetricKind {
1207 Counter,
1208 Gauge,
1209 Histogram,
1210}
1211
1212fn metric_family_names(kind: MetricKind) -> &'static [&'static str] {
1213 match kind {
1214 MetricKind::Counter => &[
1215 "harn_http_requests_total",
1216 "harn_trigger_received_total",
1217 "harn_trigger_deduped_total",
1218 "harn_trigger_predicate_evaluations_total",
1219 "harn_trigger_dispatched_total",
1220 "harn_trigger_retries_total",
1221 "harn_trigger_dlq_total",
1222 "harn_trigger_budget_exhausted_total",
1223 "harn_backpressure_events_total",
1224 "harn_a2a_hops_total",
1225 "harn_llm_calls_total",
1226 "harn_llm_cost_usd_total",
1227 "harn_llm_cache_hits_total",
1228 "harn_scheduler_selections_total",
1229 "harn_scheduler_deferrals_total",
1230 "harn_scheduler_starvation_promotions_total",
1231 ],
1232 MetricKind::Gauge => &[
1233 "harn_trigger_inflight",
1234 "harn_event_log_topic_size_bytes",
1235 "harn_event_log_consumer_lag",
1236 "harn_trigger_budget_cost_today_usd",
1237 "harn_worker_queue_depth",
1238 "harn_orchestrator_pump_backlog",
1239 "harn_orchestrator_pump_outstanding",
1240 "harn_trigger_oldest_pending_age_seconds",
1241 "harn_scheduler_deficit",
1242 "harn_scheduler_oldest_eligible_age_seconds",
1243 ],
1244 MetricKind::Histogram => &[
1245 "harn_http_request_duration_seconds",
1246 "harn_http_body_size_bytes",
1247 "harn_trigger_predicate_cost_usd",
1248 "harn_event_log_append_duration_seconds",
1249 "harn_a2a_hop_duration_seconds",
1250 "harn_worker_queue_claim_age_seconds",
1251 "harn_orchestrator_pump_admission_delay_seconds",
1252 "harn_trigger_webhook_accepted_to_normalized_seconds",
1253 "harn_trigger_webhook_accepted_to_queue_append_seconds",
1254 "harn_trigger_queue_age_at_dispatch_admission_seconds",
1255 "harn_trigger_queue_age_at_dispatch_start_seconds",
1256 "harn_trigger_dispatch_runtime_seconds",
1257 "harn_trigger_retry_delay_seconds",
1258 "harn_trigger_accepted_to_dlq_seconds",
1259 ],
1260 }
1261}
1262
1263fn labels<const N: usize>(pairs: [(&str, &str); N]) -> MetricLabels {
1264 pairs
1265 .into_iter()
1266 .map(|(name, value)| (name.to_string(), value.to_string()))
1267 .collect()
1268}
1269
1270fn trigger_lifecycle_labels(
1271 trigger_id: &str,
1272 binding_key: &str,
1273 provider: &str,
1274 tenant_id: Option<&str>,
1275 status: &str,
1276) -> MetricLabels {
1277 labels([
1278 ("binding_key", binding_key),
1279 ("provider", provider),
1280 ("status", status),
1281 ("tenant_id", tenant_label(tenant_id)),
1282 ("trigger_id", trigger_id),
1283 ])
1284}
1285
1286fn trigger_pending_labels(
1287 trigger_id: &str,
1288 binding_key: &str,
1289 provider: &str,
1290 tenant_id: Option<&str>,
1291) -> MetricLabels {
1292 labels([
1293 ("binding_key", binding_key),
1294 ("provider", provider),
1295 ("tenant_id", tenant_label(tenant_id)),
1296 ("trigger_id", trigger_id),
1297 ])
1298}
1299
1300fn tenant_label(tenant_id: Option<&str>) -> &str {
1301 tenant_id
1302 .map(str::trim)
1303 .filter(|value| !value.is_empty())
1304 .unwrap_or("none")
1305}
1306
1307fn millis_delta(later_ms: i64, earlier_ms: i64) -> StdDuration {
1308 StdDuration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
1309}
1310
1311fn render_sample(rendered: &mut String, name: &str, labels: &MetricLabels, value: f64) {
1312 rendered.push_str(name);
1313 if !labels.is_empty() {
1314 rendered.push('{');
1315 for (index, (label, label_value)) in labels.iter().enumerate() {
1316 if index > 0 {
1317 rendered.push(',');
1318 }
1319 rendered.push_str(label);
1320 rendered.push_str("=\"");
1321 rendered.push_str(&escape_label_value(label_value));
1322 rendered.push('"');
1323 }
1324 rendered.push('}');
1325 }
1326 rendered.push(' ');
1327 rendered.push_str(&prometheus_float(value));
1328 rendered.push('\n');
1329}
1330
1331fn escape_label_value(value: &str) -> String {
1332 value
1333 .chars()
1334 .flat_map(|ch| match ch {
1335 '\\' => "\\\\".chars().collect::<Vec<_>>(),
1336 '"' => "\\\"".chars().collect::<Vec<_>>(),
1337 '\n' => "\\n".chars().collect::<Vec<_>>(),
1338 other => vec![other],
1339 })
1340 .collect()
1341}
1342
1343fn prometheus_float(value: f64) -> String {
1344 if value.is_infinite() && value.is_sign_positive() {
1345 return "+Inf".to_string();
1346 }
1347 if value.fract() == 0.0 {
1348 format!("{value:.0}")
1349 } else {
1350 let rendered = format!("{value:.6}");
1351 rendered
1352 .trim_end_matches('0')
1353 .trim_end_matches('.')
1354 .to_string()
1355 }
1356}
1357
1358#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1360pub struct ProviderPayloadSchema {
1361 pub harn_schema_name: String,
1362 #[serde(default)]
1363 pub json_schema: JsonValue,
1364}
1365
1366impl ProviderPayloadSchema {
1367 pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
1368 Self {
1369 harn_schema_name: harn_schema_name.into(),
1370 json_schema,
1371 }
1372 }
1373
1374 pub fn named(harn_schema_name: impl Into<String>) -> Self {
1375 Self::new(harn_schema_name, JsonValue::Null)
1376 }
1377}
1378
1379impl Default for ProviderPayloadSchema {
1380 fn default() -> Self {
1381 Self::named("raw")
1382 }
1383}
1384
1385#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1387#[serde(transparent)]
1388pub struct TriggerKind(String);
1389
1390impl TriggerKind {
1391 pub fn new(value: impl Into<String>) -> Self {
1392 Self(value.into())
1393 }
1394
1395 pub fn as_str(&self) -> &str {
1396 self.0.as_str()
1397 }
1398}
1399
1400impl From<&str> for TriggerKind {
1401 fn from(value: &str) -> Self {
1402 Self::new(value)
1403 }
1404}
1405
1406impl From<String> for TriggerKind {
1407 fn from(value: String) -> Self {
1408 Self::new(value)
1409 }
1410}
1411
1412#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1414pub struct TriggerBinding {
1415 pub provider: ProviderId,
1416 pub kind: TriggerKind,
1417 pub binding_id: String,
1418 #[serde(default)]
1419 pub dedupe_key: Option<String>,
1420 #[serde(default = "default_dedupe_retention_days")]
1421 pub dedupe_retention_days: u32,
1422 #[serde(default)]
1423 pub config: JsonValue,
1424}
1425
1426impl TriggerBinding {
1427 pub fn new(
1428 provider: ProviderId,
1429 kind: impl Into<TriggerKind>,
1430 binding_id: impl Into<String>,
1431 ) -> Self {
1432 Self {
1433 provider,
1434 kind: kind.into(),
1435 binding_id: binding_id.into(),
1436 dedupe_key: None,
1437 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1438 config: JsonValue::Null,
1439 }
1440 }
1441}
1442
1443fn default_dedupe_retention_days() -> u32 {
1444 crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
1445}
1446
1447#[derive(Clone, Debug, Default)]
1449pub struct TriggerRegistry {
1450 bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
1451}
1452
1453impl TriggerRegistry {
1454 pub fn register(&mut self, binding: TriggerBinding) {
1455 self.bindings
1456 .entry(binding.provider.clone())
1457 .or_default()
1458 .push(binding);
1459 }
1460
1461 pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
1462 &self.bindings
1463 }
1464
1465 pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
1466 self.bindings
1467 .get(provider)
1468 .map(Vec::as_slice)
1469 .unwrap_or(&[])
1470 }
1471}
1472
1473#[derive(Clone, Debug, PartialEq, Eq)]
1475pub struct ActivationHandle {
1476 pub provider: ProviderId,
1477 pub binding_count: usize,
1478}
1479
1480impl ActivationHandle {
1481 pub fn new(provider: ProviderId, binding_count: usize) -> Self {
1482 Self {
1483 provider,
1484 binding_count,
1485 }
1486 }
1487}
1488
1489#[derive(Clone, Debug, PartialEq)]
1491pub struct RawInbound {
1492 pub kind: String,
1493 pub headers: BTreeMap<String, String>,
1494 pub query: BTreeMap<String, String>,
1495 pub body: Vec<u8>,
1496 pub received_at: OffsetDateTime,
1497 pub occurred_at: Option<OffsetDateTime>,
1498 pub tenant_id: Option<TenantId>,
1499 pub metadata: JsonValue,
1500}
1501
1502impl RawInbound {
1503 pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
1504 Self {
1505 kind: kind.into(),
1506 headers,
1507 query: BTreeMap::new(),
1508 body,
1509 received_at: clock::now_utc(),
1510 occurred_at: None,
1511 tenant_id: None,
1512 metadata: JsonValue::Null,
1513 }
1514 }
1515
1516 pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
1517 Ok(serde_json::from_slice(&self.body)?)
1518 }
1519}
1520
1521#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1523pub struct RateLimitConfig {
1524 pub capacity: u32,
1525 pub refill_tokens: u32,
1526 pub refill_interval: StdDuration,
1527}
1528
1529impl Default for RateLimitConfig {
1530 fn default() -> Self {
1531 Self {
1532 capacity: 60,
1533 refill_tokens: 1,
1534 refill_interval: StdDuration::from_secs(1),
1535 }
1536 }
1537}
1538
1539#[derive(Clone, Debug)]
1540struct TokenBucket {
1541 tokens: f64,
1542 last_refill: ClockInstant,
1543}
1544
1545impl TokenBucket {
1546 fn full(config: RateLimitConfig) -> Self {
1547 Self {
1548 tokens: config.capacity as f64,
1549 last_refill: clock::instant_now(),
1550 }
1551 }
1552
1553 fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
1554 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1555 let rate = config.refill_tokens.max(1) as f64 / interval;
1556 let elapsed = now.duration_since(self.last_refill).as_secs_f64();
1557 self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
1558 self.last_refill = now;
1559 }
1560
1561 fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
1562 self.refill(config, now);
1563 if self.tokens >= 1.0 {
1564 self.tokens -= 1.0;
1565 true
1566 } else {
1567 false
1568 }
1569 }
1570
1571 fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
1572 if self.tokens >= 1.0 {
1573 return StdDuration::ZERO;
1574 }
1575 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1576 let rate = config.refill_tokens.max(1) as f64 / interval;
1577 let missing = (1.0 - self.tokens).max(0.0);
1578 StdDuration::from_secs_f64((missing / rate).max(0.001))
1579 }
1580}
1581
1582#[derive(Debug)]
1584pub struct RateLimiterFactory {
1585 config: RateLimitConfig,
1586 buckets: Mutex<HashMap<(String, String), TokenBucket>>,
1587}
1588
1589impl RateLimiterFactory {
1590 pub fn new(config: RateLimitConfig) -> Self {
1591 Self {
1592 config,
1593 buckets: Mutex::new(HashMap::new()),
1594 }
1595 }
1596
1597 pub fn config(&self) -> RateLimitConfig {
1598 self.config
1599 }
1600
1601 pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
1602 ScopedRateLimiter {
1603 factory: self,
1604 provider: provider.clone(),
1605 key: key.into(),
1606 }
1607 }
1608
1609 pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
1610 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1611 let bucket = buckets
1612 .entry((provider.as_str().to_string(), key.to_string()))
1613 .or_insert_with(|| TokenBucket::full(self.config));
1614 bucket.try_acquire(self.config, clock::instant_now())
1615 }
1616
1617 pub async fn acquire(&self, provider: &ProviderId, key: &str) {
1618 loop {
1619 let wait = {
1620 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1621 let bucket = buckets
1622 .entry((provider.as_str().to_string(), key.to_string()))
1623 .or_insert_with(|| TokenBucket::full(self.config));
1624 if bucket.try_acquire(self.config, clock::instant_now()) {
1625 return;
1626 }
1627 bucket.wait_duration(self.config)
1628 };
1629 tokio::time::sleep(wait).await;
1630 }
1631 }
1632}
1633
1634impl Default for RateLimiterFactory {
1635 fn default() -> Self {
1636 Self::new(RateLimitConfig::default())
1637 }
1638}
1639
1640#[derive(Clone, Debug)]
1642pub struct ScopedRateLimiter<'a> {
1643 factory: &'a RateLimiterFactory,
1644 provider: ProviderId,
1645 key: String,
1646}
1647
1648impl<'a> ScopedRateLimiter<'a> {
1649 pub fn try_acquire(&self) -> bool {
1650 self.factory.try_acquire(&self.provider, &self.key)
1651 }
1652
1653 pub async fn acquire(&self) {
1654 self.factory.acquire(&self.provider, &self.key).await;
1655 }
1656}
1657
1658pub struct ConnectorRegistry {
1660 connectors: BTreeMap<ProviderId, ConnectorHandle>,
1661}
1662
1663impl ConnectorRegistry {
1664 pub fn empty() -> Self {
1665 Self {
1666 connectors: BTreeMap::new(),
1667 }
1668 }
1669
1670 pub fn with_defaults() -> Self {
1671 let mut registry = Self::empty();
1672 for provider in registered_provider_metadata() {
1673 registry
1674 .register(default_connector_for_provider(&provider))
1675 .expect("default connector registration should not fail");
1676 }
1677 registry
1678 }
1679
1680 pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
1681 let provider = connector.provider_id().clone();
1682 if self.connectors.contains_key(&provider) {
1683 return Err(ConnectorError::DuplicateProvider(provider.0));
1684 }
1685 self.connectors
1686 .insert(provider, Arc::new(AsyncMutex::new(connector)));
1687 Ok(())
1688 }
1689
1690 pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
1691 self.connectors.get(id).cloned()
1692 }
1693
1694 pub fn remove(&mut self, id: &ProviderId) -> Option<ConnectorHandle> {
1695 self.connectors.remove(id)
1696 }
1697
1698 pub fn list(&self) -> Vec<ProviderId> {
1699 self.connectors.keys().cloned().collect()
1700 }
1701
1702 pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1703 for connector in self.connectors.values() {
1704 connector.lock().await.init(ctx.clone()).await?;
1705 }
1706 Ok(())
1707 }
1708
1709 pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
1710 let mut clients = BTreeMap::new();
1711 for (provider, connector) in &self.connectors {
1712 let client = connector.lock().await.client();
1713 clients.insert(provider.clone(), client);
1714 }
1715 clients
1716 }
1717
1718 pub async fn activate_all(
1719 &self,
1720 registry: &TriggerRegistry,
1721 ) -> Result<Vec<ActivationHandle>, ConnectorError> {
1722 let mut handles = Vec::new();
1723 for (provider, connector) in &self.connectors {
1724 let bindings = registry.bindings_for(provider);
1725 if bindings.is_empty() {
1726 continue;
1727 }
1728 let connector = connector.lock().await;
1729 handles.push(connector.activate(bindings).await?);
1730 }
1731 Ok(handles)
1732 }
1733}
1734
1735impl Default for ConnectorRegistry {
1736 fn default() -> Self {
1737 Self::with_defaults()
1738 }
1739}
1740
1741fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
1742 if provider.provider == "github" {
1752 return Box::new(GitHubConnector::new());
1753 }
1754 if provider.provider == "linear" {
1755 return Box::new(LinearConnector::new());
1756 }
1757 if provider.provider == "slack" {
1758 return Box::new(SlackConnector::new());
1759 }
1760 if provider.provider == "notion" {
1761 return Box::new(NotionConnector::new());
1762 }
1763 if provider.provider == "a2a-push" {
1764 return Box::new(A2aPushConnector::new());
1765 }
1766 match &provider.runtime {
1767 ProviderRuntimeMetadata::Builtin {
1768 connector,
1769 default_signature_variant,
1770 } => match connector.as_str() {
1771 "cron" => Box::new(CronConnector::new()),
1772 "stream" => Box::new(StreamConnector::new(
1773 ProviderId::from(provider.provider.clone()),
1774 provider.schema_name.clone(),
1775 )),
1776 "webhook" => {
1777 let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
1778 .expect("catalog webhook signature variant must be valid");
1779 Box::new(GenericWebhookConnector::with_profile(
1780 WebhookProviderProfile::new(
1781 ProviderId::from(provider.provider.clone()),
1782 provider.schema_name.clone(),
1783 variant,
1784 ),
1785 ))
1786 }
1787 _ => Box::new(PlaceholderConnector::from_metadata(provider)),
1788 },
1789 ProviderRuntimeMetadata::Placeholder => {
1790 Box::new(PlaceholderConnector::from_metadata(provider))
1791 }
1792 }
1793}
1794
1795struct PlaceholderConnector {
1796 provider_id: ProviderId,
1797 kinds: Vec<TriggerKind>,
1798 schema_name: String,
1799}
1800
1801impl PlaceholderConnector {
1802 fn from_metadata(metadata: &ProviderMetadata) -> Self {
1803 Self {
1804 provider_id: ProviderId::from(metadata.provider.clone()),
1805 kinds: metadata
1806 .kinds
1807 .iter()
1808 .cloned()
1809 .map(TriggerKind::from)
1810 .collect(),
1811 schema_name: metadata.schema_name.clone(),
1812 }
1813 }
1814}
1815
1816struct PlaceholderClient;
1817
1818#[async_trait]
1819impl ConnectorClient for PlaceholderClient {
1820 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1821 Err(ClientError::Other(format!(
1822 "connector client method '{method}' is not implemented for this provider"
1823 )))
1824 }
1825}
1826
1827#[async_trait]
1828impl Connector for PlaceholderConnector {
1829 fn provider_id(&self) -> &ProviderId {
1830 &self.provider_id
1831 }
1832
1833 fn kinds(&self) -> &[TriggerKind] {
1834 &self.kinds
1835 }
1836
1837 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1838 Ok(())
1839 }
1840
1841 async fn activate(
1842 &self,
1843 bindings: &[TriggerBinding],
1844 ) -> Result<ActivationHandle, ConnectorError> {
1845 Ok(ActivationHandle::new(
1846 self.provider_id.clone(),
1847 bindings.len(),
1848 ))
1849 }
1850
1851 async fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
1852 Err(ConnectorError::Unsupported(format!(
1853 "provider '{}' is cataloged but does not have a concrete inbound connector yet",
1854 self.provider_id.as_str()
1855 )))
1856 }
1857
1858 fn payload_schema(&self) -> ProviderPayloadSchema {
1859 ProviderPayloadSchema::named(self.schema_name.clone())
1860 }
1861
1862 fn client(&self) -> Arc<dyn ConnectorClient> {
1863 Arc::new(PlaceholderClient)
1864 }
1865}
1866
1867pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
1868 ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
1869 *slot.borrow_mut() = clients
1870 .into_iter()
1871 .map(|(provider, client)| (provider.as_str().to_string(), client))
1872 .collect();
1873 });
1874}
1875
1876pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
1877 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
1878}
1879
1880pub fn clear_active_connector_clients() {
1881 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
1882}
1883
1884#[cfg(test)]
1885mod tests {
1886 use super::*;
1887
1888 use std::sync::atomic::{AtomicUsize, Ordering};
1889
1890 use async_trait::async_trait;
1891 use serde_json::json;
1892
1893 struct NoopClient;
1894
1895 #[async_trait]
1896 impl ConnectorClient for NoopClient {
1897 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1898 Ok(json!({ "method": method }))
1899 }
1900 }
1901
1902 struct FakeConnector {
1903 provider_id: ProviderId,
1904 kinds: Vec<TriggerKind>,
1905 activate_calls: Arc<AtomicUsize>,
1906 }
1907
1908 impl FakeConnector {
1909 fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
1910 Self {
1911 provider_id: ProviderId::from(provider_id),
1912 kinds: vec![TriggerKind::from("webhook")],
1913 activate_calls,
1914 }
1915 }
1916 }
1917
1918 #[async_trait]
1919 impl Connector for FakeConnector {
1920 fn provider_id(&self) -> &ProviderId {
1921 &self.provider_id
1922 }
1923
1924 fn kinds(&self) -> &[TriggerKind] {
1925 &self.kinds
1926 }
1927
1928 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1929 Ok(())
1930 }
1931
1932 async fn activate(
1933 &self,
1934 bindings: &[TriggerBinding],
1935 ) -> Result<ActivationHandle, ConnectorError> {
1936 self.activate_calls.fetch_add(1, Ordering::SeqCst);
1937 Ok(ActivationHandle::new(
1938 self.provider_id.clone(),
1939 bindings.len(),
1940 ))
1941 }
1942
1943 async fn normalize_inbound(
1944 &self,
1945 _raw: RawInbound,
1946 ) -> Result<TriggerEvent, ConnectorError> {
1947 Err(ConnectorError::Unsupported(
1948 "not needed for registry tests".to_string(),
1949 ))
1950 }
1951
1952 fn payload_schema(&self) -> ProviderPayloadSchema {
1953 ProviderPayloadSchema::named("FakePayload")
1954 }
1955
1956 fn client(&self) -> Arc<dyn ConnectorClient> {
1957 Arc::new(NoopClient)
1958 }
1959 }
1960
1961 #[tokio::test]
1962 async fn connector_registry_rejects_duplicate_providers() {
1963 let activate_calls = Arc::new(AtomicUsize::new(0));
1964 let mut registry = ConnectorRegistry::empty();
1965 registry
1966 .register(Box::new(FakeConnector::new(
1967 "github",
1968 activate_calls.clone(),
1969 )))
1970 .unwrap();
1971
1972 let error = registry
1973 .register(Box::new(FakeConnector::new("github", activate_calls)))
1974 .unwrap_err();
1975 assert!(matches!(
1976 error,
1977 ConnectorError::DuplicateProvider(provider) if provider == "github"
1978 ));
1979 }
1980
1981 #[tokio::test]
1982 async fn connector_registry_activates_only_bound_connectors() {
1983 let github_calls = Arc::new(AtomicUsize::new(0));
1984 let slack_calls = Arc::new(AtomicUsize::new(0));
1985 let mut registry = ConnectorRegistry::empty();
1986 registry
1987 .register(Box::new(FakeConnector::new("github", github_calls.clone())))
1988 .unwrap();
1989 registry
1990 .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
1991 .unwrap();
1992
1993 let mut trigger_registry = TriggerRegistry::default();
1994 trigger_registry.register(TriggerBinding::new(
1995 ProviderId::from("github"),
1996 "webhook",
1997 "github.push",
1998 ));
1999 trigger_registry.register(TriggerBinding::new(
2000 ProviderId::from("github"),
2001 "webhook",
2002 "github.installation",
2003 ));
2004
2005 let handles = registry.activate_all(&trigger_registry).await.unwrap();
2006 assert_eq!(handles.len(), 1);
2007 assert_eq!(handles[0].provider.as_str(), "github");
2008 assert_eq!(handles[0].binding_count, 2);
2009 assert_eq!(github_calls.load(Ordering::SeqCst), 1);
2010 assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
2011 }
2012
2013 #[test]
2014 fn rate_limiter_scopes_tokens_by_provider_and_key() {
2015 let factory = RateLimiterFactory::new(RateLimitConfig {
2016 capacity: 1,
2017 refill_tokens: 1,
2018 refill_interval: StdDuration::from_secs(60),
2019 });
2020
2021 assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
2022 assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
2023 assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
2024 assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
2025 }
2026
2027 #[test]
2028 fn raw_inbound_json_body_preserves_raw_bytes() {
2029 let raw = RawInbound::new(
2030 "push",
2031 BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
2032 br#"{"ok":true}"#.to_vec(),
2033 );
2034
2035 assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
2036 }
2037
2038 #[test]
2039 fn connector_registry_lists_catalog_providers() {
2040 let registry = ConnectorRegistry::default();
2041 let providers = registry.list();
2042 assert!(providers.contains(&ProviderId::from("cron")));
2043 assert!(providers.contains(&ProviderId::from("github")));
2044 assert!(providers.contains(&ProviderId::from("webhook")));
2045 }
2046
2047 #[test]
2048 fn metrics_registry_exports_orchestrator_metric_families() {
2049 let metrics = MetricsRegistry::default();
2050 metrics.record_http_request(
2051 "/triggers/github",
2052 "POST",
2053 200,
2054 StdDuration::from_millis(25),
2055 512,
2056 );
2057 metrics.record_trigger_received("github-new-issue", "github");
2058 metrics.record_trigger_deduped("github-new-issue", "inbox_duplicate");
2059 metrics.record_trigger_predicate_evaluation("github-new-issue", true, 0.002);
2060 metrics.record_trigger_dispatched("github-new-issue", "local", "succeeded");
2061 metrics.record_trigger_retry("github-new-issue", 2);
2062 metrics.record_trigger_dlq("github-new-issue", "retry_exhausted");
2063 metrics.set_trigger_inflight("github-new-issue", 0);
2064 metrics.record_event_log_append(
2065 "orchestrator.triggers.pending",
2066 StdDuration::from_millis(1),
2067 2048,
2068 );
2069 metrics.set_event_log_consumer_lag("orchestrator.triggers.pending", "orchestrator-pump", 0);
2070 metrics.set_trigger_budget_cost_today("github-new-issue", 0.002);
2071 metrics.record_trigger_budget_exhausted("github-new-issue", "daily_budget_exceeded");
2072 metrics.record_a2a_hop("agent.example", "succeeded", StdDuration::from_millis(10));
2073 metrics.set_worker_queue_depth("triage", 1);
2074 metrics.record_worker_queue_claim_age("triage", 3.0);
2075 metrics.set_orchestrator_pump_backlog("trigger.inbox.envelopes", 2);
2076 metrics.set_orchestrator_pump_outstanding("trigger.inbox.envelopes", 1);
2077 metrics.record_orchestrator_pump_admission_delay(
2078 "trigger.inbox.envelopes",
2079 StdDuration::from_millis(50),
2080 );
2081 metrics.record_trigger_accepted_to_normalized(
2082 "github-new-issue",
2083 "github-new-issue@v7",
2084 "github",
2085 Some("tenant-a"),
2086 "normalized",
2087 StdDuration::from_millis(25),
2088 );
2089 metrics.record_trigger_accepted_to_queue_append(
2090 "github-new-issue",
2091 "github-new-issue@v7",
2092 "github",
2093 Some("tenant-a"),
2094 "queued",
2095 StdDuration::from_millis(40),
2096 );
2097 metrics.record_trigger_queue_age_at_dispatch_admission(
2098 "github-new-issue",
2099 "github-new-issue@v7",
2100 "github",
2101 Some("tenant-a"),
2102 "admitted",
2103 StdDuration::from_millis(75),
2104 );
2105 metrics.record_trigger_queue_age_at_dispatch_start(
2106 "github-new-issue",
2107 "github-new-issue@v7",
2108 "github",
2109 Some("tenant-a"),
2110 "started",
2111 StdDuration::from_millis(125),
2112 );
2113 metrics.record_trigger_dispatch_runtime(
2114 "github-new-issue",
2115 "github-new-issue@v7",
2116 "github",
2117 Some("tenant-a"),
2118 "succeeded",
2119 StdDuration::from_millis(250),
2120 );
2121 metrics.record_trigger_retry_delay(
2122 "github-new-issue",
2123 "github-new-issue@v7",
2124 "github",
2125 Some("tenant-a"),
2126 "scheduled",
2127 StdDuration::from_secs(2),
2128 );
2129 metrics.record_trigger_accepted_to_dlq(
2130 "github-new-issue",
2131 "github-new-issue@v7",
2132 "github",
2133 Some("tenant-a"),
2134 "retry_exhausted",
2135 StdDuration::from_secs(45),
2136 );
2137 metrics.record_backpressure_event("ingest", "reject");
2138 metrics.note_trigger_pending_event(
2139 "evt-1",
2140 "github-new-issue",
2141 "github-new-issue@v7",
2142 "github",
2143 Some("tenant-a"),
2144 1_000,
2145 4_000,
2146 );
2147 metrics.record_llm_call("mock", "mock", "succeeded", 0.01);
2148 metrics.record_llm_cache_hit("mock");
2149
2150 let rendered = metrics.render_prometheus();
2151 for needle in [
2152 "harn_http_requests_total{endpoint=\"/triggers/github\",method=\"POST\",status=\"200\"} 1",
2153 "harn_http_request_duration_seconds_bucket{endpoint=\"/triggers/github\",le=\"0.05\"} 1",
2154 "harn_http_body_size_bytes_bucket{endpoint=\"/triggers/github\",le=\"512\"} 1",
2155 "harn_trigger_received_total{provider=\"github\",trigger_id=\"github-new-issue\"} 1",
2156 "harn_trigger_deduped_total{reason=\"inbox_duplicate\",trigger_id=\"github-new-issue\"} 1",
2157 "harn_trigger_predicate_evaluations_total{result=\"true\",trigger_id=\"github-new-issue\"} 1",
2158 "harn_trigger_predicate_cost_usd_bucket{le=\"0.01\",trigger_id=\"github-new-issue\"} 1",
2159 "harn_trigger_dispatched_total{handler_kind=\"local\",outcome=\"succeeded\",trigger_id=\"github-new-issue\"} 1",
2160 "harn_trigger_retries_total{attempt=\"2\",trigger_id=\"github-new-issue\"} 1",
2161 "harn_trigger_dlq_total{reason=\"retry_exhausted\",trigger_id=\"github-new-issue\"} 1",
2162 "harn_trigger_inflight{trigger_id=\"github-new-issue\"} 0",
2163 "harn_event_log_append_duration_seconds_bucket{le=\"0.005\",topic=\"orchestrator.triggers.pending\"} 1",
2164 "harn_event_log_topic_size_bytes{topic=\"orchestrator.triggers.pending\"} 2048",
2165 "harn_event_log_consumer_lag{consumer=\"orchestrator-pump\",topic=\"orchestrator.triggers.pending\"} 0",
2166 "harn_trigger_budget_cost_today_usd{trigger_id=\"github-new-issue\"} 0.002",
2167 "harn_trigger_budget_exhausted_total{strategy=\"daily_budget_exceeded\",trigger_id=\"github-new-issue\"} 1",
2168 "harn_backpressure_events_total{action=\"reject\",dimension=\"ingest\"} 1",
2169 "harn_a2a_hops_total{outcome=\"succeeded\",target=\"agent.example\"} 1",
2170 "harn_a2a_hop_duration_seconds_bucket{le=\"0.01\",target=\"agent.example\"} 1",
2171 "harn_worker_queue_depth{queue=\"triage\"} 1",
2172 "harn_worker_queue_claim_age_seconds_bucket{le=\"5\",queue=\"triage\"} 1",
2173 "harn_orchestrator_pump_backlog{topic=\"trigger.inbox.envelopes\"} 2",
2174 "harn_orchestrator_pump_outstanding{topic=\"trigger.inbox.envelopes\"} 1",
2175 "harn_orchestrator_pump_admission_delay_seconds_bucket{le=\"0.05\",topic=\"trigger.inbox.envelopes\"} 1",
2176 "harn_trigger_webhook_accepted_to_normalized_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.025\",provider=\"github\",status=\"normalized\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2177 "harn_trigger_webhook_accepted_to_queue_append_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.05\",provider=\"github\",status=\"queued\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2178 "harn_trigger_queue_age_at_dispatch_admission_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.1\",provider=\"github\",status=\"admitted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2179 "harn_trigger_queue_age_at_dispatch_start_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"started\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2180 "harn_trigger_dispatch_runtime_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"succeeded\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2181 "harn_trigger_retry_delay_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"2.5\",provider=\"github\",status=\"scheduled\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2182 "harn_trigger_accepted_to_dlq_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"60\",provider=\"github\",status=\"retry_exhausted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2183 "harn_trigger_oldest_pending_age_seconds{binding_key=\"github-new-issue@v7\",provider=\"github\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 3",
2184 "harn_llm_calls_total{model=\"mock\",outcome=\"succeeded\",provider=\"mock\"} 1",
2185 "harn_llm_cost_usd_total{model=\"mock\",provider=\"mock\"} 0.01",
2186 "harn_llm_cache_hits_total{provider=\"mock\"} 1",
2187 ] {
2188 assert!(rendered.contains(needle), "missing {needle}\n{rendered}");
2189 }
2190 }
2191}