1use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex, OnceLock};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25 registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26 ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod a2a_push;
30pub mod cron;
31pub mod effect_policy;
32pub mod github;
33pub mod harn_module;
34pub mod hmac;
35pub mod linear;
36pub mod notion;
37pub mod slack;
38pub mod stream;
39#[cfg(test)]
40pub(crate) mod test_util;
41pub mod testkit;
42pub mod webhook;
43
44pub use a2a_push::A2aPushConnector;
45pub use cron::{CatchupMode, CronConnector};
46pub use effect_policy::{
47 connector_export_denied_builtin_reason, connector_export_effect_class,
48 default_connector_export_policy, ConnectorExportEffectClass, HarnConnectorEffectPolicies,
49};
50pub use github::GitHubConnector;
51pub use harn_module::{
52 load_contract as load_harn_connector_contract, HarnConnector, HarnConnectorContract,
53};
54pub use hmac::{
55 verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
56 DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
57 DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
58 DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
59 DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
60 DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
61 SIGNATURE_VERIFY_AUDIT_TOPIC,
62};
63pub use linear::LinearConnector;
64pub use notion::{
65 load_pending_webhook_handshakes, NotionConnector, PersistedNotionWebhookHandshake,
66};
67pub use slack::SlackConnector;
68pub use stream::StreamConnector;
69use webhook::WebhookProviderProfile;
70pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
71
72const OUTBOUND_CONNECTOR_HTTP_TIMEOUT: StdDuration = StdDuration::from_secs(30);
73
74pub(crate) fn outbound_http_client(user_agent: &'static str) -> reqwest::Client {
75 reqwest::Client::builder()
76 .user_agent(user_agent)
77 .timeout(OUTBOUND_CONNECTOR_HTTP_TIMEOUT)
78 .redirect(reqwest::redirect::Policy::custom(|attempt| {
79 if attempt.previous().len() >= 10 {
80 attempt.error("too many redirects")
81 } else if crate::egress::redirect_url_allowed(
82 "connector_redirect",
83 attempt.url().as_str(),
84 ) {
85 attempt.follow()
86 } else {
87 attempt.error("egress policy blocked redirect target")
88 }
89 }))
90 .build()
91 .expect("connector HTTP client configuration should be valid")
92}
93
94pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
96
97thread_local! {
98 static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
99 RefCell::new(BTreeMap::new());
100}
101
102#[async_trait]
104pub trait Connector: Send + Sync {
105 fn provider_id(&self) -> &ProviderId;
107
108 fn kinds(&self) -> &[TriggerKind];
110
111 async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
113
114 async fn activate(
116 &self,
117 bindings: &[TriggerBinding],
118 ) -> Result<ActivationHandle, ConnectorError>;
119
120 async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
122 Ok(())
123 }
124
125 async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
127
128 async fn normalize_inbound_result(
131 &self,
132 raw: RawInbound,
133 ) -> Result<ConnectorNormalizeResult, ConnectorError> {
134 self.normalize_inbound(raw)
135 .await
136 .map(ConnectorNormalizeResult::event)
137 }
138
139 fn payload_schema(&self) -> ProviderPayloadSchema;
141
142 fn client(&self) -> Arc<dyn ConnectorClient>;
144}
145
146#[derive(Clone, Debug, PartialEq)]
148pub struct ConnectorHttpResponse {
149 pub status: u16,
150 pub headers: BTreeMap<String, String>,
151 pub body: JsonValue,
152}
153
154impl ConnectorHttpResponse {
155 pub fn new(status: u16, headers: BTreeMap<String, String>, body: JsonValue) -> Self {
156 Self {
157 status,
158 headers,
159 body,
160 }
161 }
162}
163
164#[derive(Clone, Debug, PartialEq)]
166pub enum ConnectorNormalizeResult {
167 Event(Box<TriggerEvent>),
168 Batch(Vec<TriggerEvent>),
169 ImmediateResponse {
170 response: ConnectorHttpResponse,
171 events: Vec<TriggerEvent>,
172 },
173 Reject(ConnectorHttpResponse),
174}
175
176impl ConnectorNormalizeResult {
177 pub fn event(event: TriggerEvent) -> Self {
178 Self::Event(Box::new(event))
179 }
180
181 pub fn into_events(self) -> Vec<TriggerEvent> {
182 match self {
183 Self::Event(event) => vec![*event],
184 Self::Batch(events) | Self::ImmediateResponse { events, .. } => events,
185 Self::Reject(_) => Vec::new(),
186 }
187 }
188}
189
190#[derive(Clone, Debug, PartialEq)]
191pub enum PostNormalizeOutcome {
192 Ready(Box<TriggerEvent>),
193 DuplicateDropped,
194}
195
196pub async fn postprocess_normalized_event(
197 inbox: &InboxIndex,
198 binding_id: &str,
199 dedupe_enabled: bool,
200 dedupe_ttl: StdDuration,
201 mut event: TriggerEvent,
202) -> Result<PostNormalizeOutcome, ConnectorError> {
203 if dedupe_enabled && !event.dedupe_claimed() {
204 if !inbox
205 .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
206 .await?
207 {
208 return Ok(PostNormalizeOutcome::DuplicateDropped);
209 }
210 event.mark_dedupe_claimed();
211 }
212
213 Ok(PostNormalizeOutcome::Ready(Box::new(event)))
214}
215
216#[async_trait]
218pub trait ConnectorClient: Send + Sync {
219 async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
220}
221
222#[derive(Clone, Debug, PartialEq, Eq)]
224pub enum ClientError {
225 MethodNotFound(String),
226 InvalidArgs(String),
227 RateLimited(String),
228 Transport(String),
229 EgressBlocked(crate::egress::EgressBlocked),
230 Other(String),
231}
232
233impl fmt::Display for ClientError {
234 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
235 match self {
236 Self::MethodNotFound(message)
237 | Self::InvalidArgs(message)
238 | Self::RateLimited(message)
239 | Self::Transport(message)
240 | Self::Other(message) => message.fmt(f),
241 Self::EgressBlocked(blocked) => blocked.fmt(f),
242 }
243 }
244}
245
246impl std::error::Error for ClientError {}
247
248#[derive(Debug)]
250pub enum ConnectorError {
251 DuplicateProvider(String),
252 DuplicateDelivery(String),
253 UnknownProvider(String),
254 MissingHeader(String),
255 InvalidHeader {
256 name: String,
257 detail: String,
258 },
259 InvalidSignature(String),
260 TimestampOutOfWindow {
261 timestamp: OffsetDateTime,
262 now: OffsetDateTime,
263 window: time::Duration,
264 },
265 Json(String),
266 Secret(String),
267 EventLog(String),
268 HarnRuntime(String),
269 Client(ClientError),
270 Unsupported(String),
271 Activation(String),
272}
273
274impl ConnectorError {
275 pub fn invalid_signature(message: impl Into<String>) -> Self {
276 Self::InvalidSignature(message.into())
277 }
278}
279
280impl fmt::Display for ConnectorError {
281 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
282 match self {
283 Self::DuplicateProvider(provider) => {
284 write!(f, "connector provider `{provider}` is already registered")
285 }
286 Self::DuplicateDelivery(message) => message.fmt(f),
287 Self::UnknownProvider(provider) => {
288 write!(f, "connector provider `{provider}` is not registered")
289 }
290 Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
291 Self::InvalidHeader { name, detail } => {
292 write!(f, "invalid header `{name}`: {detail}")
293 }
294 Self::InvalidSignature(message)
295 | Self::Json(message)
296 | Self::Secret(message)
297 | Self::EventLog(message)
298 | Self::HarnRuntime(message)
299 | Self::Unsupported(message)
300 | Self::Activation(message) => message.fmt(f),
301 Self::TimestampOutOfWindow {
302 timestamp,
303 now,
304 window,
305 } => write!(
306 f,
307 "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
308 ),
309 Self::Client(error) => error.fmt(f),
310 }
311 }
312}
313
314impl std::error::Error for ConnectorError {}
315
316impl From<crate::event_log::LogError> for ConnectorError {
317 fn from(value: crate::event_log::LogError) -> Self {
318 Self::EventLog(value.to_string())
319 }
320}
321
322impl From<crate::secrets::SecretError> for ConnectorError {
323 fn from(value: crate::secrets::SecretError) -> Self {
324 Self::Secret(value.to_string())
325 }
326}
327
328impl From<serde_json::Error> for ConnectorError {
329 fn from(value: serde_json::Error) -> Self {
330 Self::Json(value.to_string())
331 }
332}
333
334impl From<ClientError> for ConnectorError {
335 fn from(value: ClientError) -> Self {
336 Self::Client(value)
337 }
338}
339
340#[derive(Clone)]
342pub struct ConnectorCtx {
343 pub event_log: Arc<AnyEventLog>,
344 pub secrets: Arc<dyn SecretProvider>,
345 pub inbox: Arc<InboxIndex>,
346 pub metrics: Arc<MetricsRegistry>,
347 pub rate_limiter: Arc<RateLimiterFactory>,
348}
349
350#[derive(Clone, Debug, Default, PartialEq, Eq)]
352pub struct ConnectorMetricsSnapshot {
353 pub inbox_claims_written: u64,
354 pub inbox_duplicates_rejected: u64,
355 pub inbox_fast_path_hits: u64,
356 pub inbox_durable_hits: u64,
357 pub inbox_expired_entries: u64,
358 pub inbox_active_entries: u64,
359 pub linear_timestamp_rejections_total: u64,
360 pub dispatch_succeeded_total: u64,
361 pub dispatch_failed_total: u64,
362 pub retry_scheduled_total: u64,
363 pub slack_delivery_success_total: u64,
364 pub slack_delivery_failure_total: u64,
365}
366
367type MetricLabels = BTreeMap<String, String>;
368
369#[derive(Clone, Debug, Default, PartialEq)]
370struct HistogramMetric {
371 buckets: BTreeMap<String, u64>,
372 count: u64,
373 sum: f64,
374}
375
376static ACTIVE_METRICS_REGISTRY: OnceLock<Mutex<Option<Arc<MetricsRegistry>>>> = OnceLock::new();
377
378pub fn install_active_metrics_registry(metrics: Arc<MetricsRegistry>) {
379 let slot = ACTIVE_METRICS_REGISTRY.get_or_init(|| Mutex::new(None));
380 *slot.lock().expect("active metrics registry poisoned") = Some(metrics);
381}
382
383pub fn clear_active_metrics_registry() {
384 if let Some(slot) = ACTIVE_METRICS_REGISTRY.get() {
385 *slot.lock().expect("active metrics registry poisoned") = None;
386 }
387}
388
389pub fn active_metrics_registry() -> Option<Arc<MetricsRegistry>> {
390 ACTIVE_METRICS_REGISTRY.get().and_then(|slot| {
391 slot.lock()
392 .expect("active metrics registry poisoned")
393 .clone()
394 })
395}
396
397#[derive(Debug, Default)]
399pub struct MetricsRegistry {
400 inbox_claims_written: AtomicU64,
401 inbox_duplicates_rejected: AtomicU64,
402 inbox_fast_path_hits: AtomicU64,
403 inbox_durable_hits: AtomicU64,
404 inbox_expired_entries: AtomicU64,
405 inbox_active_entries: AtomicU64,
406 linear_timestamp_rejections_total: AtomicU64,
407 dispatch_succeeded_total: AtomicU64,
408 dispatch_failed_total: AtomicU64,
409 retry_scheduled_total: AtomicU64,
410 slack_delivery_success_total: AtomicU64,
411 slack_delivery_failure_total: AtomicU64,
412 custom_counters: Mutex<BTreeMap<String, u64>>,
413 counters: Mutex<BTreeMap<(String, MetricLabels), f64>>,
414 gauges: Mutex<BTreeMap<(String, MetricLabels), f64>>,
415 histograms: Mutex<BTreeMap<(String, MetricLabels), HistogramMetric>>,
416 pending_trigger_events: Mutex<BTreeMap<MetricLabels, BTreeMap<String, i64>>>,
417}
418
419impl MetricsRegistry {
420 const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0];
421 const TRIGGER_LATENCY_BUCKETS: [f64; 15] = [
422 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0,
423 ];
424 const SIZE_BUCKETS: [f64; 9] = [
425 128.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0, 10485760.0,
426 ];
427
428 pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
429 ConnectorMetricsSnapshot {
430 inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
431 inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
432 inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
433 inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
434 inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
435 inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
436 linear_timestamp_rejections_total: self
437 .linear_timestamp_rejections_total
438 .load(Ordering::Relaxed),
439 dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
440 dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
441 retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
442 slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
443 slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
444 }
445 }
446
447 pub(crate) fn record_inbox_claim(&self) {
448 self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
449 }
450
451 pub(crate) fn record_inbox_duplicate_fast_path(&self) {
452 self.inbox_duplicates_rejected
453 .fetch_add(1, Ordering::Relaxed);
454 self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
455 }
456
457 pub(crate) fn record_inbox_duplicate_durable(&self) {
458 self.inbox_duplicates_rejected
459 .fetch_add(1, Ordering::Relaxed);
460 self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
461 }
462
463 pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
464 if count > 0 {
465 self.inbox_expired_entries
466 .fetch_add(count, Ordering::Relaxed);
467 }
468 }
469
470 pub(crate) fn set_inbox_active_entries(&self, count: usize) {
471 self.inbox_active_entries
472 .store(count as u64, Ordering::Relaxed);
473 }
474
475 pub fn record_linear_timestamp_rejection(&self) {
476 self.linear_timestamp_rejections_total
477 .fetch_add(1, Ordering::Relaxed);
478 }
479
480 pub fn record_dispatch_succeeded(&self) {
481 self.dispatch_succeeded_total
482 .fetch_add(1, Ordering::Relaxed);
483 }
484
485 pub fn record_dispatch_failed(&self) {
486 self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
487 }
488
489 pub fn record_retry_scheduled(&self) {
490 self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
491 }
492
493 pub fn record_slack_delivery_success(&self) {
494 self.slack_delivery_success_total
495 .fetch_add(1, Ordering::Relaxed);
496 }
497
498 pub fn record_slack_delivery_failure(&self) {
499 self.slack_delivery_failure_total
500 .fetch_add(1, Ordering::Relaxed);
501 }
502
503 pub fn record_custom_counter(&self, name: &str, amount: u64) {
504 if amount == 0 {
505 return;
506 }
507 let mut counters = self
508 .custom_counters
509 .lock()
510 .expect("custom counters poisoned");
511 *counters.entry(name.to_string()).or_default() += amount;
512 }
513
514 pub fn record_http_request(
515 &self,
516 endpoint: &str,
517 method: &str,
518 status: u16,
519 duration: StdDuration,
520 body_size_bytes: usize,
521 ) {
522 self.increment_counter(
523 "harn_http_requests_total",
524 labels([
525 ("endpoint", endpoint),
526 ("method", method),
527 ("status", &status.to_string()),
528 ]),
529 1,
530 );
531 self.observe_histogram(
532 "harn_http_request_duration_seconds",
533 labels([("endpoint", endpoint)]),
534 duration.as_secs_f64(),
535 &Self::DURATION_BUCKETS,
536 );
537 self.observe_histogram(
538 "harn_http_body_size_bytes",
539 labels([("endpoint", endpoint)]),
540 body_size_bytes as f64,
541 &Self::SIZE_BUCKETS,
542 );
543 }
544
545 pub fn record_trigger_received(&self, trigger_id: &str, provider: &str) {
546 self.increment_counter(
547 "harn_trigger_received_total",
548 labels([("trigger_id", trigger_id), ("provider", provider)]),
549 1,
550 );
551 }
552
553 pub fn record_trigger_deduped(&self, trigger_id: &str, reason: &str) {
554 self.increment_counter(
555 "harn_trigger_deduped_total",
556 labels([("trigger_id", trigger_id), ("reason", reason)]),
557 1,
558 );
559 }
560
561 pub fn record_trigger_predicate_evaluation(
562 &self,
563 trigger_id: &str,
564 result: bool,
565 cost_usd: f64,
566 ) {
567 self.increment_counter(
568 "harn_trigger_predicate_evaluations_total",
569 labels([
570 ("trigger_id", trigger_id),
571 ("result", if result { "true" } else { "false" }),
572 ]),
573 1,
574 );
575 self.observe_histogram(
576 "harn_trigger_predicate_cost_usd",
577 labels([("trigger_id", trigger_id)]),
578 cost_usd.max(0.0),
579 &[0.0, 0.001, 0.01, 0.05, 0.1, 1.0],
580 );
581 }
582
583 pub fn record_trigger_dispatched(&self, trigger_id: &str, handler_kind: &str, outcome: &str) {
584 self.increment_counter(
585 "harn_trigger_dispatched_total",
586 labels([
587 ("trigger_id", trigger_id),
588 ("handler_kind", handler_kind),
589 ("outcome", outcome),
590 ]),
591 1,
592 );
593 }
594
595 pub fn record_trigger_retry(&self, trigger_id: &str, attempt: u32) {
596 self.increment_counter(
597 "harn_trigger_retries_total",
598 labels([
599 ("trigger_id", trigger_id),
600 ("attempt", &attempt.to_string()),
601 ]),
602 1,
603 );
604 }
605
606 pub fn record_trigger_dlq(&self, trigger_id: &str, reason: &str) {
607 self.increment_counter(
608 "harn_trigger_dlq_total",
609 labels([("trigger_id", trigger_id), ("reason", reason)]),
610 1,
611 );
612 }
613
614 pub fn record_trigger_accepted_to_normalized(
615 &self,
616 trigger_id: &str,
617 binding_key: &str,
618 provider: &str,
619 tenant_id: Option<&str>,
620 status: &str,
621 duration: StdDuration,
622 ) {
623 self.observe_histogram(
624 "harn_trigger_webhook_accepted_to_normalized_seconds",
625 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
626 duration.as_secs_f64(),
627 &Self::TRIGGER_LATENCY_BUCKETS,
628 );
629 }
630
631 pub fn record_trigger_accepted_to_queue_append(
632 &self,
633 trigger_id: &str,
634 binding_key: &str,
635 provider: &str,
636 tenant_id: Option<&str>,
637 status: &str,
638 duration: StdDuration,
639 ) {
640 self.observe_histogram(
641 "harn_trigger_webhook_accepted_to_queue_append_seconds",
642 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
643 duration.as_secs_f64(),
644 &Self::TRIGGER_LATENCY_BUCKETS,
645 );
646 }
647
648 pub fn record_trigger_queue_age_at_dispatch_admission(
649 &self,
650 trigger_id: &str,
651 binding_key: &str,
652 provider: &str,
653 tenant_id: Option<&str>,
654 status: &str,
655 age: StdDuration,
656 ) {
657 self.observe_histogram(
658 "harn_trigger_queue_age_at_dispatch_admission_seconds",
659 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
660 age.as_secs_f64(),
661 &Self::TRIGGER_LATENCY_BUCKETS,
662 );
663 }
664
665 pub fn record_trigger_queue_age_at_dispatch_start(
666 &self,
667 trigger_id: &str,
668 binding_key: &str,
669 provider: &str,
670 tenant_id: Option<&str>,
671 status: &str,
672 age: StdDuration,
673 ) {
674 self.observe_histogram(
675 "harn_trigger_queue_age_at_dispatch_start_seconds",
676 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
677 age.as_secs_f64(),
678 &Self::TRIGGER_LATENCY_BUCKETS,
679 );
680 }
681
682 pub fn record_trigger_dispatch_runtime(
683 &self,
684 trigger_id: &str,
685 binding_key: &str,
686 provider: &str,
687 tenant_id: Option<&str>,
688 status: &str,
689 duration: StdDuration,
690 ) {
691 self.observe_histogram(
692 "harn_trigger_dispatch_runtime_seconds",
693 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
694 duration.as_secs_f64(),
695 &Self::TRIGGER_LATENCY_BUCKETS,
696 );
697 }
698
699 pub fn record_trigger_retry_delay(
700 &self,
701 trigger_id: &str,
702 binding_key: &str,
703 provider: &str,
704 tenant_id: Option<&str>,
705 status: &str,
706 duration: StdDuration,
707 ) {
708 self.observe_histogram(
709 "harn_trigger_retry_delay_seconds",
710 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
711 duration.as_secs_f64(),
712 &Self::TRIGGER_LATENCY_BUCKETS,
713 );
714 }
715
716 pub fn record_trigger_accepted_to_dlq(
717 &self,
718 trigger_id: &str,
719 binding_key: &str,
720 provider: &str,
721 tenant_id: Option<&str>,
722 status: &str,
723 duration: StdDuration,
724 ) {
725 self.observe_histogram(
726 "harn_trigger_accepted_to_dlq_seconds",
727 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
728 duration.as_secs_f64(),
729 &Self::TRIGGER_LATENCY_BUCKETS,
730 );
731 }
732
733 pub fn note_trigger_pending_event(
734 &self,
735 event_id: &str,
736 trigger_id: &str,
737 binding_key: &str,
738 provider: &str,
739 tenant_id: Option<&str>,
740 accepted_at_ms: i64,
741 now_ms: i64,
742 ) {
743 let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
744 {
745 let mut pending = self
746 .pending_trigger_events
747 .lock()
748 .expect("pending trigger events poisoned");
749 pending
750 .entry(labels.clone())
751 .or_default()
752 .insert(event_id.to_string(), accepted_at_ms);
753 }
754 self.refresh_oldest_pending_gauge(labels, now_ms);
755 }
756
757 pub fn clear_trigger_pending_event(
758 &self,
759 event_id: &str,
760 trigger_id: &str,
761 binding_key: &str,
762 provider: &str,
763 tenant_id: Option<&str>,
764 now_ms: i64,
765 ) {
766 let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
767 {
768 let mut pending = self
769 .pending_trigger_events
770 .lock()
771 .expect("pending trigger events poisoned");
772 if let Some(events) = pending.get_mut(&labels) {
773 events.remove(event_id);
774 if events.is_empty() {
775 pending.remove(&labels);
776 }
777 }
778 }
779 self.refresh_oldest_pending_gauge(labels, now_ms);
780 }
781
782 pub fn set_trigger_inflight(&self, trigger_id: &str, count: u64) {
783 self.set_gauge(
784 "harn_trigger_inflight",
785 labels([("trigger_id", trigger_id)]),
786 count as f64,
787 );
788 }
789
790 pub fn set_trigger_budget_cost_today(&self, trigger_id: &str, cost_usd: f64) {
791 self.set_gauge(
792 "harn_trigger_budget_cost_today_usd",
793 labels([("trigger_id", trigger_id)]),
794 cost_usd.max(0.0),
795 );
796 }
797
798 pub fn record_trigger_budget_exhausted(&self, trigger_id: &str, strategy: &str) {
799 self.increment_counter(
800 "harn_trigger_budget_exhausted_total",
801 labels([("trigger_id", trigger_id), ("strategy", strategy)]),
802 1,
803 );
804 }
805
806 pub fn record_backpressure_event(&self, dimension: &str, action: &str) {
807 self.increment_counter(
808 "harn_backpressure_events_total",
809 labels([("dimension", dimension), ("action", action)]),
810 1,
811 );
812 }
813
814 pub fn record_event_log_append(
815 &self,
816 topic: &str,
817 duration: StdDuration,
818 payload_bytes: usize,
819 ) {
820 self.observe_histogram(
821 "harn_event_log_append_duration_seconds",
822 labels([("topic", topic)]),
823 duration.as_secs_f64(),
824 &Self::DURATION_BUCKETS,
825 );
826 self.set_gauge(
827 "harn_event_log_topic_size_bytes",
828 labels([("topic", topic)]),
829 payload_bytes as f64,
830 );
831 }
832
833 pub fn set_event_log_consumer_lag(&self, topic: &str, consumer: &str, lag: u64) {
834 self.set_gauge(
835 "harn_event_log_consumer_lag",
836 labels([("topic", topic), ("consumer", consumer)]),
837 lag as f64,
838 );
839 }
840
841 pub fn record_a2a_hop(&self, target: &str, outcome: &str, duration: StdDuration) {
842 self.increment_counter(
843 "harn_a2a_hops_total",
844 labels([("target", target), ("outcome", outcome)]),
845 1,
846 );
847 self.observe_histogram(
848 "harn_a2a_hop_duration_seconds",
849 labels([("target", target)]),
850 duration.as_secs_f64(),
851 &Self::DURATION_BUCKETS,
852 );
853 }
854
855 pub fn set_worker_queue_depth(&self, queue: &str, depth: u64) {
856 self.set_gauge(
857 "harn_worker_queue_depth",
858 labels([("queue", queue)]),
859 depth as f64,
860 );
861 }
862
863 pub fn record_worker_queue_claim_age(&self, queue: &str, age_seconds: f64) {
864 self.observe_histogram(
865 "harn_worker_queue_claim_age_seconds",
866 labels([("queue", queue)]),
867 age_seconds.max(0.0),
868 &Self::DURATION_BUCKETS,
869 );
870 }
871
872 pub fn record_scheduler_selection(
874 &self,
875 queue: &str,
876 fairness_dimension: &str,
877 fairness_key: &str,
878 ) {
879 self.increment_counter(
880 "harn_scheduler_selections_total",
881 labels([
882 ("queue", queue),
883 ("fairness_dimension", fairness_dimension),
884 ("fairness_key", fairness_key),
885 ]),
886 1,
887 );
888 }
889
890 pub fn record_scheduler_deferral(
893 &self,
894 queue: &str,
895 fairness_dimension: &str,
896 fairness_key: &str,
897 ) {
898 self.increment_counter(
899 "harn_scheduler_deferrals_total",
900 labels([
901 ("queue", queue),
902 ("fairness_dimension", fairness_dimension),
903 ("fairness_key", fairness_key),
904 ]),
905 1,
906 );
907 }
908
909 pub fn record_scheduler_starvation_promotion(
911 &self,
912 queue: &str,
913 fairness_dimension: &str,
914 fairness_key: &str,
915 ) {
916 self.increment_counter(
917 "harn_scheduler_starvation_promotions_total",
918 labels([
919 ("queue", queue),
920 ("fairness_dimension", fairness_dimension),
921 ("fairness_key", fairness_key),
922 ]),
923 1,
924 );
925 }
926
927 pub fn set_scheduler_deficit(
929 &self,
930 queue: &str,
931 fairness_dimension: &str,
932 fairness_key: &str,
933 deficit: i64,
934 ) {
935 self.set_gauge(
936 "harn_scheduler_deficit",
937 labels([
938 ("queue", queue),
939 ("fairness_dimension", fairness_dimension),
940 ("fairness_key", fairness_key),
941 ]),
942 deficit as f64,
943 );
944 }
945
946 pub fn set_scheduler_oldest_eligible_age(
948 &self,
949 queue: &str,
950 fairness_dimension: &str,
951 fairness_key: &str,
952 age_ms: u64,
953 ) {
954 self.set_gauge(
955 "harn_scheduler_oldest_eligible_age_seconds",
956 labels([
957 ("queue", queue),
958 ("fairness_dimension", fairness_dimension),
959 ("fairness_key", fairness_key),
960 ]),
961 age_ms as f64 / 1000.0,
962 );
963 }
964
965 pub fn set_orchestrator_pump_backlog(&self, topic: &str, count: u64) {
966 self.set_gauge(
967 "harn_orchestrator_pump_backlog",
968 labels([("topic", topic)]),
969 count as f64,
970 );
971 }
972
973 pub fn set_orchestrator_pump_outstanding(&self, topic: &str, count: usize) {
974 self.set_gauge(
975 "harn_orchestrator_pump_outstanding",
976 labels([("topic", topic)]),
977 count as f64,
978 );
979 }
980
981 pub fn record_orchestrator_pump_admission_delay(&self, topic: &str, duration: StdDuration) {
982 self.observe_histogram(
983 "harn_orchestrator_pump_admission_delay_seconds",
984 labels([("topic", topic)]),
985 duration.as_secs_f64(),
986 &Self::DURATION_BUCKETS,
987 );
988 }
989
990 pub fn record_llm_call(&self, provider: &str, model: &str, outcome: &str, cost_usd: f64) {
991 self.increment_counter(
992 "harn_llm_calls_total",
993 labels([
994 ("provider", provider),
995 ("model", model),
996 ("outcome", outcome),
997 ]),
998 1,
999 );
1000 if cost_usd > 0.0 {
1001 self.increment_counter(
1002 "harn_llm_cost_usd_total",
1003 labels([("provider", provider), ("model", model)]),
1004 cost_usd,
1005 );
1006 } else {
1007 self.ensure_counter(
1008 "harn_llm_cost_usd_total",
1009 labels([("provider", provider), ("model", model)]),
1010 );
1011 }
1012 }
1013
1014 pub fn record_llm_cache_hit(&self, provider: &str) {
1015 self.increment_counter(
1016 "harn_llm_cache_hits_total",
1017 labels([("provider", provider)]),
1018 1,
1019 );
1020 }
1021
1022 pub fn render_prometheus(&self) -> String {
1023 let snapshot = self.snapshot();
1024 let counters = [
1025 (
1026 "connector_linear_timestamp_rejections_total",
1027 snapshot.linear_timestamp_rejections_total,
1028 ),
1029 (
1030 "dispatch_succeeded_total",
1031 snapshot.dispatch_succeeded_total,
1032 ),
1033 ("dispatch_failed_total", snapshot.dispatch_failed_total),
1034 ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
1035 ("retry_scheduled_total", snapshot.retry_scheduled_total),
1036 (
1037 "slack_events_delivery_success_total",
1038 snapshot.slack_delivery_success_total,
1039 ),
1040 (
1041 "slack_events_delivery_failure_total",
1042 snapshot.slack_delivery_failure_total,
1043 ),
1044 ];
1045
1046 let mut rendered = String::new();
1047 for (name, value) in counters {
1048 rendered.push_str("# TYPE ");
1049 rendered.push_str(name);
1050 rendered.push_str(" counter\n");
1051 rendered.push_str(name);
1052 rendered.push(' ');
1053 rendered.push_str(&value.to_string());
1054 rendered.push('\n');
1055 }
1056 let custom_counters = self
1057 .custom_counters
1058 .lock()
1059 .expect("custom counters poisoned");
1060 for (name, value) in custom_counters.iter() {
1061 let metric_name = format!(
1062 "connector_custom_{}_total",
1063 name.chars()
1064 .map(|ch| if ch.is_ascii_alphanumeric() || ch == '_' {
1065 ch
1066 } else {
1067 '_'
1068 })
1069 .collect::<String>()
1070 );
1071 rendered.push_str("# TYPE ");
1072 rendered.push_str(&metric_name);
1073 rendered.push_str(" counter\n");
1074 rendered.push_str(&metric_name);
1075 rendered.push(' ');
1076 rendered.push_str(&value.to_string());
1077 rendered.push('\n');
1078 }
1079 rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
1080 rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
1081 rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
1082 rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
1083 self.render_generic_metrics(&mut rendered);
1084 rendered
1085 }
1086
1087 fn increment_counter(&self, name: &str, labels: MetricLabels, amount: impl Into<f64>) {
1088 let amount = amount.into();
1089 if amount <= 0.0 || !amount.is_finite() {
1090 return;
1091 }
1092 let mut counters = self.counters.lock().expect("metrics counters poisoned");
1093 *counters.entry((name.to_string(), labels)).or_default() += amount;
1094 }
1095
1096 fn ensure_counter(&self, name: &str, labels: MetricLabels) {
1097 let mut counters = self.counters.lock().expect("metrics counters poisoned");
1098 counters.entry((name.to_string(), labels)).or_default();
1099 }
1100
1101 fn set_gauge(&self, name: &str, labels: MetricLabels, value: f64) {
1102 let mut gauges = self.gauges.lock().expect("metrics gauges poisoned");
1103 gauges.insert((name.to_string(), labels), value);
1104 }
1105
1106 fn observe_histogram(
1107 &self,
1108 name: &str,
1109 labels: MetricLabels,
1110 value: f64,
1111 bucket_bounds: &[f64],
1112 ) {
1113 if !value.is_finite() {
1114 return;
1115 }
1116 let mut histograms = self.histograms.lock().expect("metrics histograms poisoned");
1117 let histogram = histograms
1118 .entry((name.to_string(), labels))
1119 .or_insert_with(|| HistogramMetric {
1120 buckets: bucket_bounds
1121 .iter()
1122 .map(|bound| (prometheus_float(*bound), 0))
1123 .chain(std::iter::once(("+Inf".to_string(), 0)))
1124 .collect(),
1125 count: 0,
1126 sum: 0.0,
1127 });
1128 histogram.count += 1;
1129 histogram.sum += value;
1130 for bound in bucket_bounds {
1131 if value <= *bound {
1132 let key = prometheus_float(*bound);
1133 *histogram.buckets.entry(key).or_default() += 1;
1134 }
1135 }
1136 *histogram.buckets.entry("+Inf".to_string()).or_default() += 1;
1137 }
1138
1139 fn refresh_oldest_pending_gauge(&self, labels: MetricLabels, now_ms: i64) {
1140 let oldest_accepted_at_ms = self
1141 .pending_trigger_events
1142 .lock()
1143 .expect("pending trigger events poisoned")
1144 .get(&labels)
1145 .and_then(|events| events.values().min().copied());
1146 let age_seconds = oldest_accepted_at_ms
1147 .map(|accepted_at_ms| millis_delta(now_ms, accepted_at_ms).as_secs_f64())
1148 .unwrap_or(0.0);
1149 self.set_gauge(
1150 "harn_trigger_oldest_pending_age_seconds",
1151 labels,
1152 age_seconds,
1153 );
1154 }
1155
1156 fn render_generic_metrics(&self, rendered: &mut String) {
1157 let counters = self
1158 .counters
1159 .lock()
1160 .expect("metrics counters poisoned")
1161 .clone();
1162 let gauges = self.gauges.lock().expect("metrics gauges poisoned").clone();
1163 let histograms = self
1164 .histograms
1165 .lock()
1166 .expect("metrics histograms poisoned")
1167 .clone();
1168
1169 for name in metric_family_names(MetricKind::Counter) {
1170 rendered.push_str("# TYPE ");
1171 rendered.push_str(name);
1172 rendered.push_str(" counter\n");
1173 for ((sample_name, labels), value) in counters.iter().filter(|((n, _), _)| n == name) {
1174 render_sample(rendered, sample_name, labels, *value);
1175 }
1176 }
1177 for name in metric_family_names(MetricKind::Gauge) {
1178 rendered.push_str("# TYPE ");
1179 rendered.push_str(name);
1180 rendered.push_str(" gauge\n");
1181 for ((sample_name, labels), value) in gauges.iter().filter(|((n, _), _)| n == name) {
1182 render_sample(rendered, sample_name, labels, *value);
1183 }
1184 }
1185 for name in metric_family_names(MetricKind::Histogram) {
1186 rendered.push_str("# TYPE ");
1187 rendered.push_str(name);
1188 rendered.push_str(" histogram\n");
1189 for ((sample_name, labels), histogram) in
1190 histograms.iter().filter(|((n, _), _)| n == name)
1191 {
1192 for (le, value) in &histogram.buckets {
1193 let mut bucket_labels = labels.clone();
1194 bucket_labels.insert("le".to_string(), le.clone());
1195 render_sample(
1196 rendered,
1197 &format!("{sample_name}_bucket"),
1198 &bucket_labels,
1199 *value as f64,
1200 );
1201 }
1202 render_sample(
1203 rendered,
1204 &format!("{sample_name}_sum"),
1205 labels,
1206 histogram.sum,
1207 );
1208 render_sample(
1209 rendered,
1210 &format!("{sample_name}_count"),
1211 labels,
1212 histogram.count as f64,
1213 );
1214 }
1215 }
1216 }
1217}
1218
1219#[derive(Clone, Copy)]
1220enum MetricKind {
1221 Counter,
1222 Gauge,
1223 Histogram,
1224}
1225
1226fn metric_family_names(kind: MetricKind) -> &'static [&'static str] {
1227 match kind {
1228 MetricKind::Counter => &[
1229 "harn_http_requests_total",
1230 "harn_trigger_received_total",
1231 "harn_trigger_deduped_total",
1232 "harn_trigger_predicate_evaluations_total",
1233 "harn_trigger_dispatched_total",
1234 "harn_trigger_retries_total",
1235 "harn_trigger_dlq_total",
1236 "harn_trigger_budget_exhausted_total",
1237 "harn_backpressure_events_total",
1238 "harn_a2a_hops_total",
1239 "harn_llm_calls_total",
1240 "harn_llm_cost_usd_total",
1241 "harn_llm_cache_hits_total",
1242 "harn_scheduler_selections_total",
1243 "harn_scheduler_deferrals_total",
1244 "harn_scheduler_starvation_promotions_total",
1245 ],
1246 MetricKind::Gauge => &[
1247 "harn_trigger_inflight",
1248 "harn_event_log_topic_size_bytes",
1249 "harn_event_log_consumer_lag",
1250 "harn_trigger_budget_cost_today_usd",
1251 "harn_worker_queue_depth",
1252 "harn_orchestrator_pump_backlog",
1253 "harn_orchestrator_pump_outstanding",
1254 "harn_trigger_oldest_pending_age_seconds",
1255 "harn_scheduler_deficit",
1256 "harn_scheduler_oldest_eligible_age_seconds",
1257 ],
1258 MetricKind::Histogram => &[
1259 "harn_http_request_duration_seconds",
1260 "harn_http_body_size_bytes",
1261 "harn_trigger_predicate_cost_usd",
1262 "harn_event_log_append_duration_seconds",
1263 "harn_a2a_hop_duration_seconds",
1264 "harn_worker_queue_claim_age_seconds",
1265 "harn_orchestrator_pump_admission_delay_seconds",
1266 "harn_trigger_webhook_accepted_to_normalized_seconds",
1267 "harn_trigger_webhook_accepted_to_queue_append_seconds",
1268 "harn_trigger_queue_age_at_dispatch_admission_seconds",
1269 "harn_trigger_queue_age_at_dispatch_start_seconds",
1270 "harn_trigger_dispatch_runtime_seconds",
1271 "harn_trigger_retry_delay_seconds",
1272 "harn_trigger_accepted_to_dlq_seconds",
1273 ],
1274 }
1275}
1276
1277fn labels<const N: usize>(pairs: [(&str, &str); N]) -> MetricLabels {
1278 pairs
1279 .into_iter()
1280 .map(|(name, value)| (name.to_string(), value.to_string()))
1281 .collect()
1282}
1283
1284fn trigger_lifecycle_labels(
1285 trigger_id: &str,
1286 binding_key: &str,
1287 provider: &str,
1288 tenant_id: Option<&str>,
1289 status: &str,
1290) -> MetricLabels {
1291 labels([
1292 ("binding_key", binding_key),
1293 ("provider", provider),
1294 ("status", status),
1295 ("tenant_id", tenant_label(tenant_id)),
1296 ("trigger_id", trigger_id),
1297 ])
1298}
1299
1300fn trigger_pending_labels(
1301 trigger_id: &str,
1302 binding_key: &str,
1303 provider: &str,
1304 tenant_id: Option<&str>,
1305) -> MetricLabels {
1306 labels([
1307 ("binding_key", binding_key),
1308 ("provider", provider),
1309 ("tenant_id", tenant_label(tenant_id)),
1310 ("trigger_id", trigger_id),
1311 ])
1312}
1313
1314fn tenant_label(tenant_id: Option<&str>) -> &str {
1315 tenant_id
1316 .map(str::trim)
1317 .filter(|value| !value.is_empty())
1318 .unwrap_or("none")
1319}
1320
1321fn millis_delta(later_ms: i64, earlier_ms: i64) -> StdDuration {
1322 StdDuration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
1323}
1324
1325fn render_sample(rendered: &mut String, name: &str, labels: &MetricLabels, value: f64) {
1326 rendered.push_str(name);
1327 if !labels.is_empty() {
1328 rendered.push('{');
1329 for (index, (label, label_value)) in labels.iter().enumerate() {
1330 if index > 0 {
1331 rendered.push(',');
1332 }
1333 rendered.push_str(label);
1334 rendered.push_str("=\"");
1335 rendered.push_str(&escape_label_value(label_value));
1336 rendered.push('"');
1337 }
1338 rendered.push('}');
1339 }
1340 rendered.push(' ');
1341 rendered.push_str(&prometheus_float(value));
1342 rendered.push('\n');
1343}
1344
1345fn escape_label_value(value: &str) -> String {
1346 value
1347 .chars()
1348 .flat_map(|ch| match ch {
1349 '\\' => "\\\\".chars().collect::<Vec<_>>(),
1350 '"' => "\\\"".chars().collect::<Vec<_>>(),
1351 '\n' => "\\n".chars().collect::<Vec<_>>(),
1352 other => vec![other],
1353 })
1354 .collect()
1355}
1356
1357fn prometheus_float(value: f64) -> String {
1358 if value.is_infinite() && value.is_sign_positive() {
1359 return "+Inf".to_string();
1360 }
1361 if value.fract() == 0.0 {
1362 format!("{value:.0}")
1363 } else {
1364 let rendered = format!("{value:.6}");
1365 rendered
1366 .trim_end_matches('0')
1367 .trim_end_matches('.')
1368 .to_string()
1369 }
1370}
1371
1372#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1374pub struct ProviderPayloadSchema {
1375 pub harn_schema_name: String,
1376 #[serde(default)]
1377 pub json_schema: JsonValue,
1378}
1379
1380impl ProviderPayloadSchema {
1381 pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
1382 Self {
1383 harn_schema_name: harn_schema_name.into(),
1384 json_schema,
1385 }
1386 }
1387
1388 pub fn named(harn_schema_name: impl Into<String>) -> Self {
1389 Self::new(harn_schema_name, JsonValue::Null)
1390 }
1391}
1392
1393impl Default for ProviderPayloadSchema {
1394 fn default() -> Self {
1395 Self::named("raw")
1396 }
1397}
1398
1399#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1401#[serde(transparent)]
1402pub struct TriggerKind(String);
1403
1404impl TriggerKind {
1405 pub fn new(value: impl Into<String>) -> Self {
1406 Self(value.into())
1407 }
1408
1409 pub fn as_str(&self) -> &str {
1410 self.0.as_str()
1411 }
1412}
1413
1414impl From<&str> for TriggerKind {
1415 fn from(value: &str) -> Self {
1416 Self::new(value)
1417 }
1418}
1419
1420impl From<String> for TriggerKind {
1421 fn from(value: String) -> Self {
1422 Self::new(value)
1423 }
1424}
1425
1426#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1428pub struct TriggerBinding {
1429 pub provider: ProviderId,
1430 pub kind: TriggerKind,
1431 pub binding_id: String,
1432 #[serde(default)]
1433 pub dedupe_key: Option<String>,
1434 #[serde(default = "default_dedupe_retention_days")]
1435 pub dedupe_retention_days: u32,
1436 #[serde(default)]
1437 pub config: JsonValue,
1438}
1439
1440impl TriggerBinding {
1441 pub fn new(
1442 provider: ProviderId,
1443 kind: impl Into<TriggerKind>,
1444 binding_id: impl Into<String>,
1445 ) -> Self {
1446 Self {
1447 provider,
1448 kind: kind.into(),
1449 binding_id: binding_id.into(),
1450 dedupe_key: None,
1451 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1452 config: JsonValue::Null,
1453 }
1454 }
1455}
1456
1457fn default_dedupe_retention_days() -> u32 {
1458 crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
1459}
1460
1461#[derive(Clone, Debug, Default)]
1463pub struct TriggerRegistry {
1464 bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
1465}
1466
1467impl TriggerRegistry {
1468 pub fn register(&mut self, binding: TriggerBinding) {
1469 self.bindings
1470 .entry(binding.provider.clone())
1471 .or_default()
1472 .push(binding);
1473 }
1474
1475 pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
1476 &self.bindings
1477 }
1478
1479 pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
1480 self.bindings
1481 .get(provider)
1482 .map(Vec::as_slice)
1483 .unwrap_or(&[])
1484 }
1485}
1486
1487#[derive(Clone, Debug, PartialEq, Eq)]
1489pub struct ActivationHandle {
1490 pub provider: ProviderId,
1491 pub binding_count: usize,
1492}
1493
1494impl ActivationHandle {
1495 pub fn new(provider: ProviderId, binding_count: usize) -> Self {
1496 Self {
1497 provider,
1498 binding_count,
1499 }
1500 }
1501}
1502
1503#[derive(Clone, Debug, PartialEq)]
1505pub struct RawInbound {
1506 pub kind: String,
1507 pub headers: BTreeMap<String, String>,
1508 pub query: BTreeMap<String, String>,
1509 pub body: Vec<u8>,
1510 pub received_at: OffsetDateTime,
1511 pub occurred_at: Option<OffsetDateTime>,
1512 pub tenant_id: Option<TenantId>,
1513 pub metadata: JsonValue,
1514}
1515
1516impl RawInbound {
1517 pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
1518 Self {
1519 kind: kind.into(),
1520 headers,
1521 query: BTreeMap::new(),
1522 body,
1523 received_at: clock::now_utc(),
1524 occurred_at: None,
1525 tenant_id: None,
1526 metadata: JsonValue::Null,
1527 }
1528 }
1529
1530 pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
1531 Ok(serde_json::from_slice(&self.body)?)
1532 }
1533}
1534
1535#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1537pub struct RateLimitConfig {
1538 pub capacity: u32,
1539 pub refill_tokens: u32,
1540 pub refill_interval: StdDuration,
1541}
1542
1543impl Default for RateLimitConfig {
1544 fn default() -> Self {
1545 Self {
1546 capacity: 60,
1547 refill_tokens: 1,
1548 refill_interval: StdDuration::from_secs(1),
1549 }
1550 }
1551}
1552
1553#[derive(Clone, Debug)]
1554struct TokenBucket {
1555 tokens: f64,
1556 last_refill: ClockInstant,
1557}
1558
1559impl TokenBucket {
1560 fn full(config: RateLimitConfig) -> Self {
1561 Self {
1562 tokens: config.capacity as f64,
1563 last_refill: clock::instant_now(),
1564 }
1565 }
1566
1567 fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
1568 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1569 let rate = config.refill_tokens.max(1) as f64 / interval;
1570 let elapsed = now.duration_since(self.last_refill).as_secs_f64();
1571 self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
1572 self.last_refill = now;
1573 }
1574
1575 fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
1576 self.refill(config, now);
1577 if self.tokens >= 1.0 {
1578 self.tokens -= 1.0;
1579 true
1580 } else {
1581 false
1582 }
1583 }
1584
1585 fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
1586 if self.tokens >= 1.0 {
1587 return StdDuration::ZERO;
1588 }
1589 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1590 let rate = config.refill_tokens.max(1) as f64 / interval;
1591 let missing = (1.0 - self.tokens).max(0.0);
1592 StdDuration::from_secs_f64((missing / rate).max(0.001))
1593 }
1594}
1595
1596#[derive(Debug)]
1598pub struct RateLimiterFactory {
1599 config: RateLimitConfig,
1600 buckets: Mutex<HashMap<(String, String), TokenBucket>>,
1601}
1602
1603impl RateLimiterFactory {
1604 pub fn new(config: RateLimitConfig) -> Self {
1605 Self {
1606 config,
1607 buckets: Mutex::new(HashMap::new()),
1608 }
1609 }
1610
1611 pub fn config(&self) -> RateLimitConfig {
1612 self.config
1613 }
1614
1615 pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
1616 ScopedRateLimiter {
1617 factory: self,
1618 provider: provider.clone(),
1619 key: key.into(),
1620 }
1621 }
1622
1623 pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
1624 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1625 let bucket = buckets
1626 .entry((provider.as_str().to_string(), key.to_string()))
1627 .or_insert_with(|| TokenBucket::full(self.config));
1628 bucket.try_acquire(self.config, clock::instant_now())
1629 }
1630
1631 pub async fn acquire(&self, provider: &ProviderId, key: &str) {
1632 loop {
1633 let wait = {
1634 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1635 let bucket = buckets
1636 .entry((provider.as_str().to_string(), key.to_string()))
1637 .or_insert_with(|| TokenBucket::full(self.config));
1638 if bucket.try_acquire(self.config, clock::instant_now()) {
1639 return;
1640 }
1641 bucket.wait_duration(self.config)
1642 };
1643 tokio::time::sleep(wait).await;
1644 }
1645 }
1646}
1647
1648impl Default for RateLimiterFactory {
1649 fn default() -> Self {
1650 Self::new(RateLimitConfig::default())
1651 }
1652}
1653
1654#[derive(Clone, Debug)]
1656pub struct ScopedRateLimiter<'a> {
1657 factory: &'a RateLimiterFactory,
1658 provider: ProviderId,
1659 key: String,
1660}
1661
1662impl<'a> ScopedRateLimiter<'a> {
1663 pub fn try_acquire(&self) -> bool {
1664 self.factory.try_acquire(&self.provider, &self.key)
1665 }
1666
1667 pub async fn acquire(&self) {
1668 self.factory.acquire(&self.provider, &self.key).await;
1669 }
1670}
1671
1672pub struct ConnectorRegistry {
1674 connectors: BTreeMap<ProviderId, ConnectorHandle>,
1675}
1676
1677impl ConnectorRegistry {
1678 pub fn empty() -> Self {
1679 Self {
1680 connectors: BTreeMap::new(),
1681 }
1682 }
1683
1684 pub fn with_defaults() -> Self {
1685 let mut registry = Self::empty();
1686 for provider in registered_provider_metadata() {
1687 registry
1688 .register(default_connector_for_provider(&provider))
1689 .expect("default connector registration should not fail");
1690 }
1691 registry
1692 }
1693
1694 pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
1695 let provider = connector.provider_id().clone();
1696 if self.connectors.contains_key(&provider) {
1697 return Err(ConnectorError::DuplicateProvider(provider.0));
1698 }
1699 self.connectors
1700 .insert(provider, Arc::new(AsyncMutex::new(connector)));
1701 Ok(())
1702 }
1703
1704 pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
1705 self.connectors.get(id).cloned()
1706 }
1707
1708 pub fn remove(&mut self, id: &ProviderId) -> Option<ConnectorHandle> {
1709 self.connectors.remove(id)
1710 }
1711
1712 pub fn list(&self) -> Vec<ProviderId> {
1713 self.connectors.keys().cloned().collect()
1714 }
1715
1716 pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1717 for connector in self.connectors.values() {
1718 connector.lock().await.init(ctx.clone()).await?;
1719 }
1720 Ok(())
1721 }
1722
1723 pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
1724 let mut clients = BTreeMap::new();
1725 for (provider, connector) in &self.connectors {
1726 let client = connector.lock().await.client();
1727 clients.insert(provider.clone(), client);
1728 }
1729 clients
1730 }
1731
1732 pub async fn activate_all(
1733 &self,
1734 registry: &TriggerRegistry,
1735 ) -> Result<Vec<ActivationHandle>, ConnectorError> {
1736 let mut handles = Vec::new();
1737 for (provider, connector) in &self.connectors {
1738 let bindings = registry.bindings_for(provider);
1739 if bindings.is_empty() {
1740 continue;
1741 }
1742 let connector = connector.lock().await;
1743 handles.push(connector.activate(bindings).await?);
1744 }
1745 Ok(handles)
1746 }
1747}
1748
1749impl Default for ConnectorRegistry {
1750 fn default() -> Self {
1751 Self::with_defaults()
1752 }
1753}
1754
1755fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
1756 if provider.provider == "github" {
1766 return Box::new(GitHubConnector::new());
1767 }
1768 if provider.provider == "linear" {
1769 return Box::new(LinearConnector::new());
1770 }
1771 if provider.provider == "slack" {
1772 return Box::new(SlackConnector::new());
1773 }
1774 if provider.provider == "notion" {
1775 return Box::new(NotionConnector::new());
1776 }
1777 if provider.provider == "a2a-push" {
1778 return Box::new(A2aPushConnector::new());
1779 }
1780 match &provider.runtime {
1781 ProviderRuntimeMetadata::Builtin {
1782 connector,
1783 default_signature_variant,
1784 } => match connector.as_str() {
1785 "cron" => Box::new(CronConnector::new()),
1786 "stream" => Box::new(StreamConnector::new(
1787 ProviderId::from(provider.provider.clone()),
1788 provider.schema_name.clone(),
1789 )),
1790 "webhook" => {
1791 let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
1792 .expect("catalog webhook signature variant must be valid");
1793 Box::new(GenericWebhookConnector::with_profile(
1794 WebhookProviderProfile::new(
1795 ProviderId::from(provider.provider.clone()),
1796 provider.schema_name.clone(),
1797 variant,
1798 ),
1799 ))
1800 }
1801 _ => Box::new(PlaceholderConnector::from_metadata(provider)),
1802 },
1803 ProviderRuntimeMetadata::Placeholder => {
1804 Box::new(PlaceholderConnector::from_metadata(provider))
1805 }
1806 }
1807}
1808
1809struct PlaceholderConnector {
1810 provider_id: ProviderId,
1811 kinds: Vec<TriggerKind>,
1812 schema_name: String,
1813}
1814
1815impl PlaceholderConnector {
1816 fn from_metadata(metadata: &ProviderMetadata) -> Self {
1817 Self {
1818 provider_id: ProviderId::from(metadata.provider.clone()),
1819 kinds: metadata
1820 .kinds
1821 .iter()
1822 .cloned()
1823 .map(TriggerKind::from)
1824 .collect(),
1825 schema_name: metadata.schema_name.clone(),
1826 }
1827 }
1828}
1829
1830struct PlaceholderClient;
1831
1832#[async_trait]
1833impl ConnectorClient for PlaceholderClient {
1834 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1835 Err(ClientError::Other(format!(
1836 "connector client method '{method}' is not implemented for this provider"
1837 )))
1838 }
1839}
1840
1841#[async_trait]
1842impl Connector for PlaceholderConnector {
1843 fn provider_id(&self) -> &ProviderId {
1844 &self.provider_id
1845 }
1846
1847 fn kinds(&self) -> &[TriggerKind] {
1848 &self.kinds
1849 }
1850
1851 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1852 Ok(())
1853 }
1854
1855 async fn activate(
1856 &self,
1857 bindings: &[TriggerBinding],
1858 ) -> Result<ActivationHandle, ConnectorError> {
1859 Ok(ActivationHandle::new(
1860 self.provider_id.clone(),
1861 bindings.len(),
1862 ))
1863 }
1864
1865 async fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
1866 Err(ConnectorError::Unsupported(format!(
1867 "provider '{}' is cataloged but does not have a concrete inbound connector yet",
1868 self.provider_id.as_str()
1869 )))
1870 }
1871
1872 fn payload_schema(&self) -> ProviderPayloadSchema {
1873 ProviderPayloadSchema::named(self.schema_name.clone())
1874 }
1875
1876 fn client(&self) -> Arc<dyn ConnectorClient> {
1877 Arc::new(PlaceholderClient)
1878 }
1879}
1880
1881pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
1882 ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
1883 *slot.borrow_mut() = clients
1884 .into_iter()
1885 .map(|(provider, client)| (provider.as_str().to_string(), client))
1886 .collect();
1887 });
1888}
1889
1890pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
1891 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
1892}
1893
1894pub fn clear_active_connector_clients() {
1895 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
1896}
1897
1898#[cfg(test)]
1899mod tests {
1900 use super::*;
1901
1902 use std::sync::atomic::{AtomicUsize, Ordering};
1903
1904 use async_trait::async_trait;
1905 use serde_json::json;
1906
1907 struct NoopClient;
1908
1909 #[async_trait]
1910 impl ConnectorClient for NoopClient {
1911 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1912 Ok(json!({ "method": method }))
1913 }
1914 }
1915
1916 struct FakeConnector {
1917 provider_id: ProviderId,
1918 kinds: Vec<TriggerKind>,
1919 activate_calls: Arc<AtomicUsize>,
1920 }
1921
1922 impl FakeConnector {
1923 fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
1924 Self {
1925 provider_id: ProviderId::from(provider_id),
1926 kinds: vec![TriggerKind::from("webhook")],
1927 activate_calls,
1928 }
1929 }
1930 }
1931
1932 #[async_trait]
1933 impl Connector for FakeConnector {
1934 fn provider_id(&self) -> &ProviderId {
1935 &self.provider_id
1936 }
1937
1938 fn kinds(&self) -> &[TriggerKind] {
1939 &self.kinds
1940 }
1941
1942 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1943 Ok(())
1944 }
1945
1946 async fn activate(
1947 &self,
1948 bindings: &[TriggerBinding],
1949 ) -> Result<ActivationHandle, ConnectorError> {
1950 self.activate_calls.fetch_add(1, Ordering::SeqCst);
1951 Ok(ActivationHandle::new(
1952 self.provider_id.clone(),
1953 bindings.len(),
1954 ))
1955 }
1956
1957 async fn normalize_inbound(
1958 &self,
1959 _raw: RawInbound,
1960 ) -> Result<TriggerEvent, ConnectorError> {
1961 Err(ConnectorError::Unsupported(
1962 "not needed for registry tests".to_string(),
1963 ))
1964 }
1965
1966 fn payload_schema(&self) -> ProviderPayloadSchema {
1967 ProviderPayloadSchema::named("FakePayload")
1968 }
1969
1970 fn client(&self) -> Arc<dyn ConnectorClient> {
1971 Arc::new(NoopClient)
1972 }
1973 }
1974
1975 #[tokio::test]
1976 async fn connector_registry_rejects_duplicate_providers() {
1977 let activate_calls = Arc::new(AtomicUsize::new(0));
1978 let mut registry = ConnectorRegistry::empty();
1979 registry
1980 .register(Box::new(FakeConnector::new(
1981 "github",
1982 activate_calls.clone(),
1983 )))
1984 .unwrap();
1985
1986 let error = registry
1987 .register(Box::new(FakeConnector::new("github", activate_calls)))
1988 .unwrap_err();
1989 assert!(matches!(
1990 error,
1991 ConnectorError::DuplicateProvider(provider) if provider == "github"
1992 ));
1993 }
1994
1995 #[tokio::test]
1996 async fn connector_registry_activates_only_bound_connectors() {
1997 let github_calls = Arc::new(AtomicUsize::new(0));
1998 let slack_calls = Arc::new(AtomicUsize::new(0));
1999 let mut registry = ConnectorRegistry::empty();
2000 registry
2001 .register(Box::new(FakeConnector::new("github", github_calls.clone())))
2002 .unwrap();
2003 registry
2004 .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
2005 .unwrap();
2006
2007 let mut trigger_registry = TriggerRegistry::default();
2008 trigger_registry.register(TriggerBinding::new(
2009 ProviderId::from("github"),
2010 "webhook",
2011 "github.push",
2012 ));
2013 trigger_registry.register(TriggerBinding::new(
2014 ProviderId::from("github"),
2015 "webhook",
2016 "github.installation",
2017 ));
2018
2019 let handles = registry.activate_all(&trigger_registry).await.unwrap();
2020 assert_eq!(handles.len(), 1);
2021 assert_eq!(handles[0].provider.as_str(), "github");
2022 assert_eq!(handles[0].binding_count, 2);
2023 assert_eq!(github_calls.load(Ordering::SeqCst), 1);
2024 assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
2025 }
2026
2027 #[test]
2028 fn rate_limiter_scopes_tokens_by_provider_and_key() {
2029 let factory = RateLimiterFactory::new(RateLimitConfig {
2030 capacity: 1,
2031 refill_tokens: 1,
2032 refill_interval: StdDuration::from_secs(60),
2033 });
2034
2035 assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
2036 assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
2037 assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
2038 assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
2039 }
2040
2041 #[test]
2042 fn raw_inbound_json_body_preserves_raw_bytes() {
2043 let raw = RawInbound::new(
2044 "push",
2045 BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
2046 br#"{"ok":true}"#.to_vec(),
2047 );
2048
2049 assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
2050 }
2051
2052 #[test]
2053 fn connector_registry_lists_catalog_providers() {
2054 let registry = ConnectorRegistry::default();
2055 let providers = registry.list();
2056 assert!(providers.contains(&ProviderId::from("cron")));
2057 assert!(providers.contains(&ProviderId::from("github")));
2058 assert!(providers.contains(&ProviderId::from("webhook")));
2059 }
2060
2061 #[test]
2062 fn metrics_registry_exports_orchestrator_metric_families() {
2063 let metrics = MetricsRegistry::default();
2064 metrics.record_http_request(
2065 "/triggers/github",
2066 "POST",
2067 200,
2068 StdDuration::from_millis(25),
2069 512,
2070 );
2071 metrics.record_trigger_received("github-new-issue", "github");
2072 metrics.record_trigger_deduped("github-new-issue", "inbox_duplicate");
2073 metrics.record_trigger_predicate_evaluation("github-new-issue", true, 0.002);
2074 metrics.record_trigger_dispatched("github-new-issue", "local", "succeeded");
2075 metrics.record_trigger_retry("github-new-issue", 2);
2076 metrics.record_trigger_dlq("github-new-issue", "retry_exhausted");
2077 metrics.set_trigger_inflight("github-new-issue", 0);
2078 metrics.record_event_log_append(
2079 "orchestrator.triggers.pending",
2080 StdDuration::from_millis(1),
2081 2048,
2082 );
2083 metrics.set_event_log_consumer_lag("orchestrator.triggers.pending", "orchestrator-pump", 0);
2084 metrics.set_trigger_budget_cost_today("github-new-issue", 0.002);
2085 metrics.record_trigger_budget_exhausted("github-new-issue", "daily_budget_exceeded");
2086 metrics.record_a2a_hop("agent.example", "succeeded", StdDuration::from_millis(10));
2087 metrics.set_worker_queue_depth("triage", 1);
2088 metrics.record_worker_queue_claim_age("triage", 3.0);
2089 metrics.set_orchestrator_pump_backlog("trigger.inbox.envelopes", 2);
2090 metrics.set_orchestrator_pump_outstanding("trigger.inbox.envelopes", 1);
2091 metrics.record_orchestrator_pump_admission_delay(
2092 "trigger.inbox.envelopes",
2093 StdDuration::from_millis(50),
2094 );
2095 metrics.record_trigger_accepted_to_normalized(
2096 "github-new-issue",
2097 "github-new-issue@v7",
2098 "github",
2099 Some("tenant-a"),
2100 "normalized",
2101 StdDuration::from_millis(25),
2102 );
2103 metrics.record_trigger_accepted_to_queue_append(
2104 "github-new-issue",
2105 "github-new-issue@v7",
2106 "github",
2107 Some("tenant-a"),
2108 "queued",
2109 StdDuration::from_millis(40),
2110 );
2111 metrics.record_trigger_queue_age_at_dispatch_admission(
2112 "github-new-issue",
2113 "github-new-issue@v7",
2114 "github",
2115 Some("tenant-a"),
2116 "admitted",
2117 StdDuration::from_millis(75),
2118 );
2119 metrics.record_trigger_queue_age_at_dispatch_start(
2120 "github-new-issue",
2121 "github-new-issue@v7",
2122 "github",
2123 Some("tenant-a"),
2124 "started",
2125 StdDuration::from_millis(125),
2126 );
2127 metrics.record_trigger_dispatch_runtime(
2128 "github-new-issue",
2129 "github-new-issue@v7",
2130 "github",
2131 Some("tenant-a"),
2132 "succeeded",
2133 StdDuration::from_millis(250),
2134 );
2135 metrics.record_trigger_retry_delay(
2136 "github-new-issue",
2137 "github-new-issue@v7",
2138 "github",
2139 Some("tenant-a"),
2140 "scheduled",
2141 StdDuration::from_secs(2),
2142 );
2143 metrics.record_trigger_accepted_to_dlq(
2144 "github-new-issue",
2145 "github-new-issue@v7",
2146 "github",
2147 Some("tenant-a"),
2148 "retry_exhausted",
2149 StdDuration::from_secs(45),
2150 );
2151 metrics.record_backpressure_event("ingest", "reject");
2152 metrics.note_trigger_pending_event(
2153 "evt-1",
2154 "github-new-issue",
2155 "github-new-issue@v7",
2156 "github",
2157 Some("tenant-a"),
2158 1_000,
2159 4_000,
2160 );
2161 metrics.record_llm_call("mock", "mock", "succeeded", 0.01);
2162 metrics.record_llm_cache_hit("mock");
2163
2164 let rendered = metrics.render_prometheus();
2165 for needle in [
2166 "harn_http_requests_total{endpoint=\"/triggers/github\",method=\"POST\",status=\"200\"} 1",
2167 "harn_http_request_duration_seconds_bucket{endpoint=\"/triggers/github\",le=\"0.05\"} 1",
2168 "harn_http_body_size_bytes_bucket{endpoint=\"/triggers/github\",le=\"512\"} 1",
2169 "harn_trigger_received_total{provider=\"github\",trigger_id=\"github-new-issue\"} 1",
2170 "harn_trigger_deduped_total{reason=\"inbox_duplicate\",trigger_id=\"github-new-issue\"} 1",
2171 "harn_trigger_predicate_evaluations_total{result=\"true\",trigger_id=\"github-new-issue\"} 1",
2172 "harn_trigger_predicate_cost_usd_bucket{le=\"0.01\",trigger_id=\"github-new-issue\"} 1",
2173 "harn_trigger_dispatched_total{handler_kind=\"local\",outcome=\"succeeded\",trigger_id=\"github-new-issue\"} 1",
2174 "harn_trigger_retries_total{attempt=\"2\",trigger_id=\"github-new-issue\"} 1",
2175 "harn_trigger_dlq_total{reason=\"retry_exhausted\",trigger_id=\"github-new-issue\"} 1",
2176 "harn_trigger_inflight{trigger_id=\"github-new-issue\"} 0",
2177 "harn_event_log_append_duration_seconds_bucket{le=\"0.005\",topic=\"orchestrator.triggers.pending\"} 1",
2178 "harn_event_log_topic_size_bytes{topic=\"orchestrator.triggers.pending\"} 2048",
2179 "harn_event_log_consumer_lag{consumer=\"orchestrator-pump\",topic=\"orchestrator.triggers.pending\"} 0",
2180 "harn_trigger_budget_cost_today_usd{trigger_id=\"github-new-issue\"} 0.002",
2181 "harn_trigger_budget_exhausted_total{strategy=\"daily_budget_exceeded\",trigger_id=\"github-new-issue\"} 1",
2182 "harn_backpressure_events_total{action=\"reject\",dimension=\"ingest\"} 1",
2183 "harn_a2a_hops_total{outcome=\"succeeded\",target=\"agent.example\"} 1",
2184 "harn_a2a_hop_duration_seconds_bucket{le=\"0.01\",target=\"agent.example\"} 1",
2185 "harn_worker_queue_depth{queue=\"triage\"} 1",
2186 "harn_worker_queue_claim_age_seconds_bucket{le=\"5\",queue=\"triage\"} 1",
2187 "harn_orchestrator_pump_backlog{topic=\"trigger.inbox.envelopes\"} 2",
2188 "harn_orchestrator_pump_outstanding{topic=\"trigger.inbox.envelopes\"} 1",
2189 "harn_orchestrator_pump_admission_delay_seconds_bucket{le=\"0.05\",topic=\"trigger.inbox.envelopes\"} 1",
2190 "harn_trigger_webhook_accepted_to_normalized_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.025\",provider=\"github\",status=\"normalized\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2191 "harn_trigger_webhook_accepted_to_queue_append_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.05\",provider=\"github\",status=\"queued\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2192 "harn_trigger_queue_age_at_dispatch_admission_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.1\",provider=\"github\",status=\"admitted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2193 "harn_trigger_queue_age_at_dispatch_start_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"started\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2194 "harn_trigger_dispatch_runtime_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"succeeded\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2195 "harn_trigger_retry_delay_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"2.5\",provider=\"github\",status=\"scheduled\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2196 "harn_trigger_accepted_to_dlq_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"60\",provider=\"github\",status=\"retry_exhausted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2197 "harn_trigger_oldest_pending_age_seconds{binding_key=\"github-new-issue@v7\",provider=\"github\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 3",
2198 "harn_llm_calls_total{model=\"mock\",outcome=\"succeeded\",provider=\"mock\"} 1",
2199 "harn_llm_cost_usd_total{model=\"mock\",provider=\"mock\"} 0.01",
2200 "harn_llm_cache_hits_total{provider=\"mock\"} 1",
2201 ] {
2202 assert!(rendered.contains(needle), "missing {needle}\n{rendered}");
2203 }
2204 }
2205}