1use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex, OnceLock};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25 registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26 ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod a2a_push;
30pub mod cron;
31pub mod effect_policy;
32pub mod github;
33pub mod harn_module;
34pub mod hmac;
35pub mod linear;
36pub mod notion;
37pub mod slack;
38pub mod stream;
39#[cfg(test)]
40pub(crate) mod test_util;
41pub mod testkit;
42pub mod webhook;
43
44pub use a2a_push::A2aPushConnector;
45pub use cron::{CatchupMode, CronConnector};
46pub use effect_policy::{
47 connector_export_denied_builtin_reason, connector_export_effect_class,
48 default_connector_export_policy, ConnectorExportEffectClass, HarnConnectorEffectPolicies,
49};
50pub use github::GitHubConnector;
51pub use harn_module::{
52 load_contract as load_harn_connector_contract, HarnConnector, HarnConnectorContract,
53};
54pub use hmac::{
55 verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
56 DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
57 DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
58 DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
59 DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
60 DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
61 SIGNATURE_VERIFY_AUDIT_TOPIC,
62};
63pub use linear::LinearConnector;
64pub use notion::{
65 load_pending_webhook_handshakes, NotionConnector, PersistedNotionWebhookHandshake,
66};
67pub use slack::SlackConnector;
68pub use stream::StreamConnector;
69use webhook::WebhookProviderProfile;
70pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
71
72const OUTBOUND_CONNECTOR_HTTP_TIMEOUT: StdDuration = StdDuration::from_secs(30);
73
74pub(crate) fn outbound_http_client(user_agent: &'static str) -> reqwest::Client {
75 reqwest::Client::builder()
76 .user_agent(user_agent)
77 .timeout(OUTBOUND_CONNECTOR_HTTP_TIMEOUT)
78 .build()
79 .expect("connector HTTP client configuration should be valid")
80}
81
82pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
84
85thread_local! {
86 static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
87 RefCell::new(BTreeMap::new());
88}
89
90#[async_trait]
92pub trait Connector: Send + Sync {
93 fn provider_id(&self) -> &ProviderId;
95
96 fn kinds(&self) -> &[TriggerKind];
98
99 async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
101
102 async fn activate(
104 &self,
105 bindings: &[TriggerBinding],
106 ) -> Result<ActivationHandle, ConnectorError>;
107
108 async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
110 Ok(())
111 }
112
113 async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
115
116 async fn normalize_inbound_result(
119 &self,
120 raw: RawInbound,
121 ) -> Result<ConnectorNormalizeResult, ConnectorError> {
122 self.normalize_inbound(raw)
123 .await
124 .map(ConnectorNormalizeResult::event)
125 }
126
127 fn payload_schema(&self) -> ProviderPayloadSchema;
129
130 fn client(&self) -> Arc<dyn ConnectorClient>;
132}
133
134#[derive(Clone, Debug, PartialEq)]
136pub struct ConnectorHttpResponse {
137 pub status: u16,
138 pub headers: BTreeMap<String, String>,
139 pub body: JsonValue,
140}
141
142impl ConnectorHttpResponse {
143 pub fn new(status: u16, headers: BTreeMap<String, String>, body: JsonValue) -> Self {
144 Self {
145 status,
146 headers,
147 body,
148 }
149 }
150}
151
152#[derive(Clone, Debug, PartialEq)]
154pub enum ConnectorNormalizeResult {
155 Event(Box<TriggerEvent>),
156 Batch(Vec<TriggerEvent>),
157 ImmediateResponse {
158 response: ConnectorHttpResponse,
159 events: Vec<TriggerEvent>,
160 },
161 Reject(ConnectorHttpResponse),
162}
163
164impl ConnectorNormalizeResult {
165 pub fn event(event: TriggerEvent) -> Self {
166 Self::Event(Box::new(event))
167 }
168
169 pub fn into_events(self) -> Vec<TriggerEvent> {
170 match self {
171 Self::Event(event) => vec![*event],
172 Self::Batch(events) | Self::ImmediateResponse { events, .. } => events,
173 Self::Reject(_) => Vec::new(),
174 }
175 }
176}
177
178#[derive(Clone, Debug, PartialEq)]
179pub enum PostNormalizeOutcome {
180 Ready(Box<TriggerEvent>),
181 DuplicateDropped,
182}
183
184pub async fn postprocess_normalized_event(
185 inbox: &InboxIndex,
186 binding_id: &str,
187 dedupe_enabled: bool,
188 dedupe_ttl: StdDuration,
189 mut event: TriggerEvent,
190) -> Result<PostNormalizeOutcome, ConnectorError> {
191 if dedupe_enabled && !event.dedupe_claimed() {
192 if !inbox
193 .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
194 .await?
195 {
196 return Ok(PostNormalizeOutcome::DuplicateDropped);
197 }
198 event.mark_dedupe_claimed();
199 }
200
201 Ok(PostNormalizeOutcome::Ready(Box::new(event)))
202}
203
204#[async_trait]
206pub trait ConnectorClient: Send + Sync {
207 async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
208}
209
210#[derive(Clone, Debug, PartialEq, Eq)]
212pub enum ClientError {
213 MethodNotFound(String),
214 InvalidArgs(String),
215 RateLimited(String),
216 Transport(String),
217 Other(String),
218}
219
220impl fmt::Display for ClientError {
221 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
222 match self {
223 Self::MethodNotFound(message)
224 | Self::InvalidArgs(message)
225 | Self::RateLimited(message)
226 | Self::Transport(message)
227 | Self::Other(message) => message.fmt(f),
228 }
229 }
230}
231
232impl std::error::Error for ClientError {}
233
234#[derive(Debug)]
236pub enum ConnectorError {
237 DuplicateProvider(String),
238 DuplicateDelivery(String),
239 UnknownProvider(String),
240 MissingHeader(String),
241 InvalidHeader {
242 name: String,
243 detail: String,
244 },
245 InvalidSignature(String),
246 TimestampOutOfWindow {
247 timestamp: OffsetDateTime,
248 now: OffsetDateTime,
249 window: time::Duration,
250 },
251 Json(String),
252 Secret(String),
253 EventLog(String),
254 HarnRuntime(String),
255 Client(ClientError),
256 Unsupported(String),
257 Activation(String),
258}
259
260impl ConnectorError {
261 pub fn invalid_signature(message: impl Into<String>) -> Self {
262 Self::InvalidSignature(message.into())
263 }
264}
265
266impl fmt::Display for ConnectorError {
267 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
268 match self {
269 Self::DuplicateProvider(provider) => {
270 write!(f, "connector provider `{provider}` is already registered")
271 }
272 Self::DuplicateDelivery(message) => message.fmt(f),
273 Self::UnknownProvider(provider) => {
274 write!(f, "connector provider `{provider}` is not registered")
275 }
276 Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
277 Self::InvalidHeader { name, detail } => {
278 write!(f, "invalid header `{name}`: {detail}")
279 }
280 Self::InvalidSignature(message)
281 | Self::Json(message)
282 | Self::Secret(message)
283 | Self::EventLog(message)
284 | Self::HarnRuntime(message)
285 | Self::Unsupported(message)
286 | Self::Activation(message) => message.fmt(f),
287 Self::TimestampOutOfWindow {
288 timestamp,
289 now,
290 window,
291 } => write!(
292 f,
293 "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
294 ),
295 Self::Client(error) => error.fmt(f),
296 }
297 }
298}
299
300impl std::error::Error for ConnectorError {}
301
302impl From<crate::event_log::LogError> for ConnectorError {
303 fn from(value: crate::event_log::LogError) -> Self {
304 Self::EventLog(value.to_string())
305 }
306}
307
308impl From<crate::secrets::SecretError> for ConnectorError {
309 fn from(value: crate::secrets::SecretError) -> Self {
310 Self::Secret(value.to_string())
311 }
312}
313
314impl From<serde_json::Error> for ConnectorError {
315 fn from(value: serde_json::Error) -> Self {
316 Self::Json(value.to_string())
317 }
318}
319
320impl From<ClientError> for ConnectorError {
321 fn from(value: ClientError) -> Self {
322 Self::Client(value)
323 }
324}
325
326#[derive(Clone)]
328pub struct ConnectorCtx {
329 pub event_log: Arc<AnyEventLog>,
330 pub secrets: Arc<dyn SecretProvider>,
331 pub inbox: Arc<InboxIndex>,
332 pub metrics: Arc<MetricsRegistry>,
333 pub rate_limiter: Arc<RateLimiterFactory>,
334}
335
336#[derive(Clone, Debug, Default, PartialEq, Eq)]
338pub struct ConnectorMetricsSnapshot {
339 pub inbox_claims_written: u64,
340 pub inbox_duplicates_rejected: u64,
341 pub inbox_fast_path_hits: u64,
342 pub inbox_durable_hits: u64,
343 pub inbox_expired_entries: u64,
344 pub inbox_active_entries: u64,
345 pub linear_timestamp_rejections_total: u64,
346 pub dispatch_succeeded_total: u64,
347 pub dispatch_failed_total: u64,
348 pub retry_scheduled_total: u64,
349 pub slack_delivery_success_total: u64,
350 pub slack_delivery_failure_total: u64,
351}
352
353type MetricLabels = BTreeMap<String, String>;
354
355#[derive(Clone, Debug, Default, PartialEq)]
356struct HistogramMetric {
357 buckets: BTreeMap<String, u64>,
358 count: u64,
359 sum: f64,
360}
361
362static ACTIVE_METRICS_REGISTRY: OnceLock<Mutex<Option<Arc<MetricsRegistry>>>> = OnceLock::new();
363
364pub fn install_active_metrics_registry(metrics: Arc<MetricsRegistry>) {
365 let slot = ACTIVE_METRICS_REGISTRY.get_or_init(|| Mutex::new(None));
366 *slot.lock().expect("active metrics registry poisoned") = Some(metrics);
367}
368
369pub fn clear_active_metrics_registry() {
370 if let Some(slot) = ACTIVE_METRICS_REGISTRY.get() {
371 *slot.lock().expect("active metrics registry poisoned") = None;
372 }
373}
374
375pub fn active_metrics_registry() -> Option<Arc<MetricsRegistry>> {
376 ACTIVE_METRICS_REGISTRY.get().and_then(|slot| {
377 slot.lock()
378 .expect("active metrics registry poisoned")
379 .clone()
380 })
381}
382
383#[derive(Debug, Default)]
385pub struct MetricsRegistry {
386 inbox_claims_written: AtomicU64,
387 inbox_duplicates_rejected: AtomicU64,
388 inbox_fast_path_hits: AtomicU64,
389 inbox_durable_hits: AtomicU64,
390 inbox_expired_entries: AtomicU64,
391 inbox_active_entries: AtomicU64,
392 linear_timestamp_rejections_total: AtomicU64,
393 dispatch_succeeded_total: AtomicU64,
394 dispatch_failed_total: AtomicU64,
395 retry_scheduled_total: AtomicU64,
396 slack_delivery_success_total: AtomicU64,
397 slack_delivery_failure_total: AtomicU64,
398 custom_counters: Mutex<BTreeMap<String, u64>>,
399 counters: Mutex<BTreeMap<(String, MetricLabels), f64>>,
400 gauges: Mutex<BTreeMap<(String, MetricLabels), f64>>,
401 histograms: Mutex<BTreeMap<(String, MetricLabels), HistogramMetric>>,
402 pending_trigger_events: Mutex<BTreeMap<MetricLabels, BTreeMap<String, i64>>>,
403}
404
405impl MetricsRegistry {
406 const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0];
407 const TRIGGER_LATENCY_BUCKETS: [f64; 15] = [
408 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0,
409 ];
410 const SIZE_BUCKETS: [f64; 9] = [
411 128.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0, 10485760.0,
412 ];
413
414 pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
415 ConnectorMetricsSnapshot {
416 inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
417 inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
418 inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
419 inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
420 inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
421 inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
422 linear_timestamp_rejections_total: self
423 .linear_timestamp_rejections_total
424 .load(Ordering::Relaxed),
425 dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
426 dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
427 retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
428 slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
429 slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
430 }
431 }
432
433 pub(crate) fn record_inbox_claim(&self) {
434 self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
435 }
436
437 pub(crate) fn record_inbox_duplicate_fast_path(&self) {
438 self.inbox_duplicates_rejected
439 .fetch_add(1, Ordering::Relaxed);
440 self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
441 }
442
443 pub(crate) fn record_inbox_duplicate_durable(&self) {
444 self.inbox_duplicates_rejected
445 .fetch_add(1, Ordering::Relaxed);
446 self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
447 }
448
449 pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
450 if count > 0 {
451 self.inbox_expired_entries
452 .fetch_add(count, Ordering::Relaxed);
453 }
454 }
455
456 pub(crate) fn set_inbox_active_entries(&self, count: usize) {
457 self.inbox_active_entries
458 .store(count as u64, Ordering::Relaxed);
459 }
460
461 pub fn record_linear_timestamp_rejection(&self) {
462 self.linear_timestamp_rejections_total
463 .fetch_add(1, Ordering::Relaxed);
464 }
465
466 pub fn record_dispatch_succeeded(&self) {
467 self.dispatch_succeeded_total
468 .fetch_add(1, Ordering::Relaxed);
469 }
470
471 pub fn record_dispatch_failed(&self) {
472 self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
473 }
474
475 pub fn record_retry_scheduled(&self) {
476 self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
477 }
478
479 pub fn record_slack_delivery_success(&self) {
480 self.slack_delivery_success_total
481 .fetch_add(1, Ordering::Relaxed);
482 }
483
484 pub fn record_slack_delivery_failure(&self) {
485 self.slack_delivery_failure_total
486 .fetch_add(1, Ordering::Relaxed);
487 }
488
489 pub fn record_custom_counter(&self, name: &str, amount: u64) {
490 if amount == 0 {
491 return;
492 }
493 let mut counters = self
494 .custom_counters
495 .lock()
496 .expect("custom counters poisoned");
497 *counters.entry(name.to_string()).or_default() += amount;
498 }
499
500 pub fn record_http_request(
501 &self,
502 endpoint: &str,
503 method: &str,
504 status: u16,
505 duration: StdDuration,
506 body_size_bytes: usize,
507 ) {
508 self.increment_counter(
509 "harn_http_requests_total",
510 labels([
511 ("endpoint", endpoint),
512 ("method", method),
513 ("status", &status.to_string()),
514 ]),
515 1,
516 );
517 self.observe_histogram(
518 "harn_http_request_duration_seconds",
519 labels([("endpoint", endpoint)]),
520 duration.as_secs_f64(),
521 &Self::DURATION_BUCKETS,
522 );
523 self.observe_histogram(
524 "harn_http_body_size_bytes",
525 labels([("endpoint", endpoint)]),
526 body_size_bytes as f64,
527 &Self::SIZE_BUCKETS,
528 );
529 }
530
531 pub fn record_trigger_received(&self, trigger_id: &str, provider: &str) {
532 self.increment_counter(
533 "harn_trigger_received_total",
534 labels([("trigger_id", trigger_id), ("provider", provider)]),
535 1,
536 );
537 }
538
539 pub fn record_trigger_deduped(&self, trigger_id: &str, reason: &str) {
540 self.increment_counter(
541 "harn_trigger_deduped_total",
542 labels([("trigger_id", trigger_id), ("reason", reason)]),
543 1,
544 );
545 }
546
547 pub fn record_trigger_predicate_evaluation(
548 &self,
549 trigger_id: &str,
550 result: bool,
551 cost_usd: f64,
552 ) {
553 self.increment_counter(
554 "harn_trigger_predicate_evaluations_total",
555 labels([
556 ("trigger_id", trigger_id),
557 ("result", if result { "true" } else { "false" }),
558 ]),
559 1,
560 );
561 self.observe_histogram(
562 "harn_trigger_predicate_cost_usd",
563 labels([("trigger_id", trigger_id)]),
564 cost_usd.max(0.0),
565 &[0.0, 0.001, 0.01, 0.05, 0.1, 1.0],
566 );
567 }
568
569 pub fn record_trigger_dispatched(&self, trigger_id: &str, handler_kind: &str, outcome: &str) {
570 self.increment_counter(
571 "harn_trigger_dispatched_total",
572 labels([
573 ("trigger_id", trigger_id),
574 ("handler_kind", handler_kind),
575 ("outcome", outcome),
576 ]),
577 1,
578 );
579 }
580
581 pub fn record_trigger_retry(&self, trigger_id: &str, attempt: u32) {
582 self.increment_counter(
583 "harn_trigger_retries_total",
584 labels([
585 ("trigger_id", trigger_id),
586 ("attempt", &attempt.to_string()),
587 ]),
588 1,
589 );
590 }
591
592 pub fn record_trigger_dlq(&self, trigger_id: &str, reason: &str) {
593 self.increment_counter(
594 "harn_trigger_dlq_total",
595 labels([("trigger_id", trigger_id), ("reason", reason)]),
596 1,
597 );
598 }
599
600 pub fn record_trigger_accepted_to_normalized(
601 &self,
602 trigger_id: &str,
603 binding_key: &str,
604 provider: &str,
605 tenant_id: Option<&str>,
606 status: &str,
607 duration: StdDuration,
608 ) {
609 self.observe_histogram(
610 "harn_trigger_webhook_accepted_to_normalized_seconds",
611 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
612 duration.as_secs_f64(),
613 &Self::TRIGGER_LATENCY_BUCKETS,
614 );
615 }
616
617 pub fn record_trigger_accepted_to_queue_append(
618 &self,
619 trigger_id: &str,
620 binding_key: &str,
621 provider: &str,
622 tenant_id: Option<&str>,
623 status: &str,
624 duration: StdDuration,
625 ) {
626 self.observe_histogram(
627 "harn_trigger_webhook_accepted_to_queue_append_seconds",
628 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
629 duration.as_secs_f64(),
630 &Self::TRIGGER_LATENCY_BUCKETS,
631 );
632 }
633
634 pub fn record_trigger_queue_age_at_dispatch_admission(
635 &self,
636 trigger_id: &str,
637 binding_key: &str,
638 provider: &str,
639 tenant_id: Option<&str>,
640 status: &str,
641 age: StdDuration,
642 ) {
643 self.observe_histogram(
644 "harn_trigger_queue_age_at_dispatch_admission_seconds",
645 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
646 age.as_secs_f64(),
647 &Self::TRIGGER_LATENCY_BUCKETS,
648 );
649 }
650
651 pub fn record_trigger_queue_age_at_dispatch_start(
652 &self,
653 trigger_id: &str,
654 binding_key: &str,
655 provider: &str,
656 tenant_id: Option<&str>,
657 status: &str,
658 age: StdDuration,
659 ) {
660 self.observe_histogram(
661 "harn_trigger_queue_age_at_dispatch_start_seconds",
662 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
663 age.as_secs_f64(),
664 &Self::TRIGGER_LATENCY_BUCKETS,
665 );
666 }
667
668 pub fn record_trigger_dispatch_runtime(
669 &self,
670 trigger_id: &str,
671 binding_key: &str,
672 provider: &str,
673 tenant_id: Option<&str>,
674 status: &str,
675 duration: StdDuration,
676 ) {
677 self.observe_histogram(
678 "harn_trigger_dispatch_runtime_seconds",
679 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
680 duration.as_secs_f64(),
681 &Self::TRIGGER_LATENCY_BUCKETS,
682 );
683 }
684
685 pub fn record_trigger_retry_delay(
686 &self,
687 trigger_id: &str,
688 binding_key: &str,
689 provider: &str,
690 tenant_id: Option<&str>,
691 status: &str,
692 duration: StdDuration,
693 ) {
694 self.observe_histogram(
695 "harn_trigger_retry_delay_seconds",
696 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
697 duration.as_secs_f64(),
698 &Self::TRIGGER_LATENCY_BUCKETS,
699 );
700 }
701
702 pub fn record_trigger_accepted_to_dlq(
703 &self,
704 trigger_id: &str,
705 binding_key: &str,
706 provider: &str,
707 tenant_id: Option<&str>,
708 status: &str,
709 duration: StdDuration,
710 ) {
711 self.observe_histogram(
712 "harn_trigger_accepted_to_dlq_seconds",
713 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
714 duration.as_secs_f64(),
715 &Self::TRIGGER_LATENCY_BUCKETS,
716 );
717 }
718
719 pub fn note_trigger_pending_event(
720 &self,
721 event_id: &str,
722 trigger_id: &str,
723 binding_key: &str,
724 provider: &str,
725 tenant_id: Option<&str>,
726 accepted_at_ms: i64,
727 now_ms: i64,
728 ) {
729 let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
730 {
731 let mut pending = self
732 .pending_trigger_events
733 .lock()
734 .expect("pending trigger events poisoned");
735 pending
736 .entry(labels.clone())
737 .or_default()
738 .insert(event_id.to_string(), accepted_at_ms);
739 }
740 self.refresh_oldest_pending_gauge(labels, now_ms);
741 }
742
743 pub fn clear_trigger_pending_event(
744 &self,
745 event_id: &str,
746 trigger_id: &str,
747 binding_key: &str,
748 provider: &str,
749 tenant_id: Option<&str>,
750 now_ms: i64,
751 ) {
752 let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
753 {
754 let mut pending = self
755 .pending_trigger_events
756 .lock()
757 .expect("pending trigger events poisoned");
758 if let Some(events) = pending.get_mut(&labels) {
759 events.remove(event_id);
760 if events.is_empty() {
761 pending.remove(&labels);
762 }
763 }
764 }
765 self.refresh_oldest_pending_gauge(labels, now_ms);
766 }
767
768 pub fn set_trigger_inflight(&self, trigger_id: &str, count: u64) {
769 self.set_gauge(
770 "harn_trigger_inflight",
771 labels([("trigger_id", trigger_id)]),
772 count as f64,
773 );
774 }
775
776 pub fn set_trigger_budget_cost_today(&self, trigger_id: &str, cost_usd: f64) {
777 self.set_gauge(
778 "harn_trigger_budget_cost_today_usd",
779 labels([("trigger_id", trigger_id)]),
780 cost_usd.max(0.0),
781 );
782 }
783
784 pub fn record_trigger_budget_exhausted(&self, trigger_id: &str, strategy: &str) {
785 self.increment_counter(
786 "harn_trigger_budget_exhausted_total",
787 labels([("trigger_id", trigger_id), ("strategy", strategy)]),
788 1,
789 );
790 }
791
792 pub fn record_backpressure_event(&self, dimension: &str, action: &str) {
793 self.increment_counter(
794 "harn_backpressure_events_total",
795 labels([("dimension", dimension), ("action", action)]),
796 1,
797 );
798 }
799
800 pub fn record_event_log_append(
801 &self,
802 topic: &str,
803 duration: StdDuration,
804 payload_bytes: usize,
805 ) {
806 self.observe_histogram(
807 "harn_event_log_append_duration_seconds",
808 labels([("topic", topic)]),
809 duration.as_secs_f64(),
810 &Self::DURATION_BUCKETS,
811 );
812 self.set_gauge(
813 "harn_event_log_topic_size_bytes",
814 labels([("topic", topic)]),
815 payload_bytes as f64,
816 );
817 }
818
819 pub fn set_event_log_consumer_lag(&self, topic: &str, consumer: &str, lag: u64) {
820 self.set_gauge(
821 "harn_event_log_consumer_lag",
822 labels([("topic", topic), ("consumer", consumer)]),
823 lag as f64,
824 );
825 }
826
827 pub fn record_a2a_hop(&self, target: &str, outcome: &str, duration: StdDuration) {
828 self.increment_counter(
829 "harn_a2a_hops_total",
830 labels([("target", target), ("outcome", outcome)]),
831 1,
832 );
833 self.observe_histogram(
834 "harn_a2a_hop_duration_seconds",
835 labels([("target", target)]),
836 duration.as_secs_f64(),
837 &Self::DURATION_BUCKETS,
838 );
839 }
840
841 pub fn set_worker_queue_depth(&self, queue: &str, depth: u64) {
842 self.set_gauge(
843 "harn_worker_queue_depth",
844 labels([("queue", queue)]),
845 depth as f64,
846 );
847 }
848
849 pub fn record_worker_queue_claim_age(&self, queue: &str, age_seconds: f64) {
850 self.observe_histogram(
851 "harn_worker_queue_claim_age_seconds",
852 labels([("queue", queue)]),
853 age_seconds.max(0.0),
854 &Self::DURATION_BUCKETS,
855 );
856 }
857
858 pub fn set_orchestrator_pump_backlog(&self, topic: &str, count: u64) {
859 self.set_gauge(
860 "harn_orchestrator_pump_backlog",
861 labels([("topic", topic)]),
862 count as f64,
863 );
864 }
865
866 pub fn set_orchestrator_pump_outstanding(&self, topic: &str, count: usize) {
867 self.set_gauge(
868 "harn_orchestrator_pump_outstanding",
869 labels([("topic", topic)]),
870 count as f64,
871 );
872 }
873
874 pub fn record_orchestrator_pump_admission_delay(&self, topic: &str, duration: StdDuration) {
875 self.observe_histogram(
876 "harn_orchestrator_pump_admission_delay_seconds",
877 labels([("topic", topic)]),
878 duration.as_secs_f64(),
879 &Self::DURATION_BUCKETS,
880 );
881 }
882
883 pub fn record_llm_call(&self, provider: &str, model: &str, outcome: &str, cost_usd: f64) {
884 self.increment_counter(
885 "harn_llm_calls_total",
886 labels([
887 ("provider", provider),
888 ("model", model),
889 ("outcome", outcome),
890 ]),
891 1,
892 );
893 if cost_usd > 0.0 {
894 self.increment_counter(
895 "harn_llm_cost_usd_total",
896 labels([("provider", provider), ("model", model)]),
897 cost_usd,
898 );
899 } else {
900 self.ensure_counter(
901 "harn_llm_cost_usd_total",
902 labels([("provider", provider), ("model", model)]),
903 );
904 }
905 }
906
907 pub fn record_llm_cache_hit(&self, provider: &str) {
908 self.increment_counter(
909 "harn_llm_cache_hits_total",
910 labels([("provider", provider)]),
911 1,
912 );
913 }
914
915 pub fn render_prometheus(&self) -> String {
916 let snapshot = self.snapshot();
917 let counters = [
918 (
919 "connector_linear_timestamp_rejections_total",
920 snapshot.linear_timestamp_rejections_total,
921 ),
922 (
923 "dispatch_succeeded_total",
924 snapshot.dispatch_succeeded_total,
925 ),
926 ("dispatch_failed_total", snapshot.dispatch_failed_total),
927 ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
928 ("retry_scheduled_total", snapshot.retry_scheduled_total),
929 (
930 "slack_events_delivery_success_total",
931 snapshot.slack_delivery_success_total,
932 ),
933 (
934 "slack_events_delivery_failure_total",
935 snapshot.slack_delivery_failure_total,
936 ),
937 ];
938
939 let mut rendered = String::new();
940 for (name, value) in counters {
941 rendered.push_str("# TYPE ");
942 rendered.push_str(name);
943 rendered.push_str(" counter\n");
944 rendered.push_str(name);
945 rendered.push(' ');
946 rendered.push_str(&value.to_string());
947 rendered.push('\n');
948 }
949 let custom_counters = self
950 .custom_counters
951 .lock()
952 .expect("custom counters poisoned");
953 for (name, value) in custom_counters.iter() {
954 let metric_name = format!(
955 "connector_custom_{}_total",
956 name.chars()
957 .map(|ch| if ch.is_ascii_alphanumeric() || ch == '_' {
958 ch
959 } else {
960 '_'
961 })
962 .collect::<String>()
963 );
964 rendered.push_str("# TYPE ");
965 rendered.push_str(&metric_name);
966 rendered.push_str(" counter\n");
967 rendered.push_str(&metric_name);
968 rendered.push(' ');
969 rendered.push_str(&value.to_string());
970 rendered.push('\n');
971 }
972 rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
973 rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
974 rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
975 rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
976 self.render_generic_metrics(&mut rendered);
977 rendered
978 }
979
980 fn increment_counter(&self, name: &str, labels: MetricLabels, amount: impl Into<f64>) {
981 let amount = amount.into();
982 if amount <= 0.0 || !amount.is_finite() {
983 return;
984 }
985 let mut counters = self.counters.lock().expect("metrics counters poisoned");
986 *counters.entry((name.to_string(), labels)).or_default() += amount;
987 }
988
989 fn ensure_counter(&self, name: &str, labels: MetricLabels) {
990 let mut counters = self.counters.lock().expect("metrics counters poisoned");
991 counters.entry((name.to_string(), labels)).or_default();
992 }
993
994 fn set_gauge(&self, name: &str, labels: MetricLabels, value: f64) {
995 let mut gauges = self.gauges.lock().expect("metrics gauges poisoned");
996 gauges.insert((name.to_string(), labels), value);
997 }
998
999 fn observe_histogram(
1000 &self,
1001 name: &str,
1002 labels: MetricLabels,
1003 value: f64,
1004 bucket_bounds: &[f64],
1005 ) {
1006 if !value.is_finite() {
1007 return;
1008 }
1009 let mut histograms = self.histograms.lock().expect("metrics histograms poisoned");
1010 let histogram = histograms
1011 .entry((name.to_string(), labels))
1012 .or_insert_with(|| HistogramMetric {
1013 buckets: bucket_bounds
1014 .iter()
1015 .map(|bound| (prometheus_float(*bound), 0))
1016 .chain(std::iter::once(("+Inf".to_string(), 0)))
1017 .collect(),
1018 count: 0,
1019 sum: 0.0,
1020 });
1021 histogram.count += 1;
1022 histogram.sum += value;
1023 for bound in bucket_bounds {
1024 if value <= *bound {
1025 let key = prometheus_float(*bound);
1026 *histogram.buckets.entry(key).or_default() += 1;
1027 }
1028 }
1029 *histogram.buckets.entry("+Inf".to_string()).or_default() += 1;
1030 }
1031
1032 fn refresh_oldest_pending_gauge(&self, labels: MetricLabels, now_ms: i64) {
1033 let oldest_accepted_at_ms = self
1034 .pending_trigger_events
1035 .lock()
1036 .expect("pending trigger events poisoned")
1037 .get(&labels)
1038 .and_then(|events| events.values().min().copied());
1039 let age_seconds = oldest_accepted_at_ms
1040 .map(|accepted_at_ms| millis_delta(now_ms, accepted_at_ms).as_secs_f64())
1041 .unwrap_or(0.0);
1042 self.set_gauge(
1043 "harn_trigger_oldest_pending_age_seconds",
1044 labels,
1045 age_seconds,
1046 );
1047 }
1048
1049 fn render_generic_metrics(&self, rendered: &mut String) {
1050 let counters = self
1051 .counters
1052 .lock()
1053 .expect("metrics counters poisoned")
1054 .clone();
1055 let gauges = self.gauges.lock().expect("metrics gauges poisoned").clone();
1056 let histograms = self
1057 .histograms
1058 .lock()
1059 .expect("metrics histograms poisoned")
1060 .clone();
1061
1062 for name in metric_family_names(MetricKind::Counter) {
1063 rendered.push_str("# TYPE ");
1064 rendered.push_str(name);
1065 rendered.push_str(" counter\n");
1066 for ((sample_name, labels), value) in counters.iter().filter(|((n, _), _)| n == name) {
1067 render_sample(rendered, sample_name, labels, *value);
1068 }
1069 }
1070 for name in metric_family_names(MetricKind::Gauge) {
1071 rendered.push_str("# TYPE ");
1072 rendered.push_str(name);
1073 rendered.push_str(" gauge\n");
1074 for ((sample_name, labels), value) in gauges.iter().filter(|((n, _), _)| n == name) {
1075 render_sample(rendered, sample_name, labels, *value);
1076 }
1077 }
1078 for name in metric_family_names(MetricKind::Histogram) {
1079 rendered.push_str("# TYPE ");
1080 rendered.push_str(name);
1081 rendered.push_str(" histogram\n");
1082 for ((sample_name, labels), histogram) in
1083 histograms.iter().filter(|((n, _), _)| n == name)
1084 {
1085 for (le, value) in &histogram.buckets {
1086 let mut bucket_labels = labels.clone();
1087 bucket_labels.insert("le".to_string(), le.clone());
1088 render_sample(
1089 rendered,
1090 &format!("{sample_name}_bucket"),
1091 &bucket_labels,
1092 *value as f64,
1093 );
1094 }
1095 render_sample(
1096 rendered,
1097 &format!("{sample_name}_sum"),
1098 labels,
1099 histogram.sum,
1100 );
1101 render_sample(
1102 rendered,
1103 &format!("{sample_name}_count"),
1104 labels,
1105 histogram.count as f64,
1106 );
1107 }
1108 }
1109 }
1110}
1111
1112#[derive(Clone, Copy)]
1113enum MetricKind {
1114 Counter,
1115 Gauge,
1116 Histogram,
1117}
1118
1119fn metric_family_names(kind: MetricKind) -> &'static [&'static str] {
1120 match kind {
1121 MetricKind::Counter => &[
1122 "harn_http_requests_total",
1123 "harn_trigger_received_total",
1124 "harn_trigger_deduped_total",
1125 "harn_trigger_predicate_evaluations_total",
1126 "harn_trigger_dispatched_total",
1127 "harn_trigger_retries_total",
1128 "harn_trigger_dlq_total",
1129 "harn_trigger_budget_exhausted_total",
1130 "harn_backpressure_events_total",
1131 "harn_a2a_hops_total",
1132 "harn_llm_calls_total",
1133 "harn_llm_cost_usd_total",
1134 "harn_llm_cache_hits_total",
1135 ],
1136 MetricKind::Gauge => &[
1137 "harn_trigger_inflight",
1138 "harn_event_log_topic_size_bytes",
1139 "harn_event_log_consumer_lag",
1140 "harn_trigger_budget_cost_today_usd",
1141 "harn_worker_queue_depth",
1142 "harn_orchestrator_pump_backlog",
1143 "harn_orchestrator_pump_outstanding",
1144 "harn_trigger_oldest_pending_age_seconds",
1145 ],
1146 MetricKind::Histogram => &[
1147 "harn_http_request_duration_seconds",
1148 "harn_http_body_size_bytes",
1149 "harn_trigger_predicate_cost_usd",
1150 "harn_event_log_append_duration_seconds",
1151 "harn_a2a_hop_duration_seconds",
1152 "harn_worker_queue_claim_age_seconds",
1153 "harn_orchestrator_pump_admission_delay_seconds",
1154 "harn_trigger_webhook_accepted_to_normalized_seconds",
1155 "harn_trigger_webhook_accepted_to_queue_append_seconds",
1156 "harn_trigger_queue_age_at_dispatch_admission_seconds",
1157 "harn_trigger_queue_age_at_dispatch_start_seconds",
1158 "harn_trigger_dispatch_runtime_seconds",
1159 "harn_trigger_retry_delay_seconds",
1160 "harn_trigger_accepted_to_dlq_seconds",
1161 ],
1162 }
1163}
1164
1165fn labels<const N: usize>(pairs: [(&str, &str); N]) -> MetricLabels {
1166 pairs
1167 .into_iter()
1168 .map(|(name, value)| (name.to_string(), value.to_string()))
1169 .collect()
1170}
1171
1172fn trigger_lifecycle_labels(
1173 trigger_id: &str,
1174 binding_key: &str,
1175 provider: &str,
1176 tenant_id: Option<&str>,
1177 status: &str,
1178) -> MetricLabels {
1179 labels([
1180 ("binding_key", binding_key),
1181 ("provider", provider),
1182 ("status", status),
1183 ("tenant_id", tenant_label(tenant_id)),
1184 ("trigger_id", trigger_id),
1185 ])
1186}
1187
1188fn trigger_pending_labels(
1189 trigger_id: &str,
1190 binding_key: &str,
1191 provider: &str,
1192 tenant_id: Option<&str>,
1193) -> MetricLabels {
1194 labels([
1195 ("binding_key", binding_key),
1196 ("provider", provider),
1197 ("tenant_id", tenant_label(tenant_id)),
1198 ("trigger_id", trigger_id),
1199 ])
1200}
1201
1202fn tenant_label(tenant_id: Option<&str>) -> &str {
1203 tenant_id
1204 .map(str::trim)
1205 .filter(|value| !value.is_empty())
1206 .unwrap_or("none")
1207}
1208
1209fn millis_delta(later_ms: i64, earlier_ms: i64) -> StdDuration {
1210 StdDuration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
1211}
1212
1213fn render_sample(rendered: &mut String, name: &str, labels: &MetricLabels, value: f64) {
1214 rendered.push_str(name);
1215 if !labels.is_empty() {
1216 rendered.push('{');
1217 for (index, (label, label_value)) in labels.iter().enumerate() {
1218 if index > 0 {
1219 rendered.push(',');
1220 }
1221 rendered.push_str(label);
1222 rendered.push_str("=\"");
1223 rendered.push_str(&escape_label_value(label_value));
1224 rendered.push('"');
1225 }
1226 rendered.push('}');
1227 }
1228 rendered.push(' ');
1229 rendered.push_str(&prometheus_float(value));
1230 rendered.push('\n');
1231}
1232
1233fn escape_label_value(value: &str) -> String {
1234 value
1235 .chars()
1236 .flat_map(|ch| match ch {
1237 '\\' => "\\\\".chars().collect::<Vec<_>>(),
1238 '"' => "\\\"".chars().collect::<Vec<_>>(),
1239 '\n' => "\\n".chars().collect::<Vec<_>>(),
1240 other => vec![other],
1241 })
1242 .collect()
1243}
1244
1245fn prometheus_float(value: f64) -> String {
1246 if value.is_infinite() && value.is_sign_positive() {
1247 return "+Inf".to_string();
1248 }
1249 if value.fract() == 0.0 {
1250 format!("{value:.0}")
1251 } else {
1252 let rendered = format!("{value:.6}");
1253 rendered
1254 .trim_end_matches('0')
1255 .trim_end_matches('.')
1256 .to_string()
1257 }
1258}
1259
1260#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1262pub struct ProviderPayloadSchema {
1263 pub harn_schema_name: String,
1264 #[serde(default)]
1265 pub json_schema: JsonValue,
1266}
1267
1268impl ProviderPayloadSchema {
1269 pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
1270 Self {
1271 harn_schema_name: harn_schema_name.into(),
1272 json_schema,
1273 }
1274 }
1275
1276 pub fn named(harn_schema_name: impl Into<String>) -> Self {
1277 Self::new(harn_schema_name, JsonValue::Null)
1278 }
1279}
1280
1281impl Default for ProviderPayloadSchema {
1282 fn default() -> Self {
1283 Self::named("raw")
1284 }
1285}
1286
1287#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1289#[serde(transparent)]
1290pub struct TriggerKind(String);
1291
1292impl TriggerKind {
1293 pub fn new(value: impl Into<String>) -> Self {
1294 Self(value.into())
1295 }
1296
1297 pub fn as_str(&self) -> &str {
1298 self.0.as_str()
1299 }
1300}
1301
1302impl From<&str> for TriggerKind {
1303 fn from(value: &str) -> Self {
1304 Self::new(value)
1305 }
1306}
1307
1308impl From<String> for TriggerKind {
1309 fn from(value: String) -> Self {
1310 Self::new(value)
1311 }
1312}
1313
1314#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1316pub struct TriggerBinding {
1317 pub provider: ProviderId,
1318 pub kind: TriggerKind,
1319 pub binding_id: String,
1320 #[serde(default)]
1321 pub dedupe_key: Option<String>,
1322 #[serde(default = "default_dedupe_retention_days")]
1323 pub dedupe_retention_days: u32,
1324 #[serde(default)]
1325 pub config: JsonValue,
1326}
1327
1328impl TriggerBinding {
1329 pub fn new(
1330 provider: ProviderId,
1331 kind: impl Into<TriggerKind>,
1332 binding_id: impl Into<String>,
1333 ) -> Self {
1334 Self {
1335 provider,
1336 kind: kind.into(),
1337 binding_id: binding_id.into(),
1338 dedupe_key: None,
1339 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1340 config: JsonValue::Null,
1341 }
1342 }
1343}
1344
1345fn default_dedupe_retention_days() -> u32 {
1346 crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
1347}
1348
1349#[derive(Clone, Debug, Default)]
1351pub struct TriggerRegistry {
1352 bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
1353}
1354
1355impl TriggerRegistry {
1356 pub fn register(&mut self, binding: TriggerBinding) {
1357 self.bindings
1358 .entry(binding.provider.clone())
1359 .or_default()
1360 .push(binding);
1361 }
1362
1363 pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
1364 &self.bindings
1365 }
1366
1367 pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
1368 self.bindings
1369 .get(provider)
1370 .map(Vec::as_slice)
1371 .unwrap_or(&[])
1372 }
1373}
1374
1375#[derive(Clone, Debug, PartialEq, Eq)]
1377pub struct ActivationHandle {
1378 pub provider: ProviderId,
1379 pub binding_count: usize,
1380}
1381
1382impl ActivationHandle {
1383 pub fn new(provider: ProviderId, binding_count: usize) -> Self {
1384 Self {
1385 provider,
1386 binding_count,
1387 }
1388 }
1389}
1390
1391#[derive(Clone, Debug, PartialEq)]
1393pub struct RawInbound {
1394 pub kind: String,
1395 pub headers: BTreeMap<String, String>,
1396 pub query: BTreeMap<String, String>,
1397 pub body: Vec<u8>,
1398 pub received_at: OffsetDateTime,
1399 pub occurred_at: Option<OffsetDateTime>,
1400 pub tenant_id: Option<TenantId>,
1401 pub metadata: JsonValue,
1402}
1403
1404impl RawInbound {
1405 pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
1406 Self {
1407 kind: kind.into(),
1408 headers,
1409 query: BTreeMap::new(),
1410 body,
1411 received_at: clock::now_utc(),
1412 occurred_at: None,
1413 tenant_id: None,
1414 metadata: JsonValue::Null,
1415 }
1416 }
1417
1418 pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
1419 Ok(serde_json::from_slice(&self.body)?)
1420 }
1421}
1422
1423#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1425pub struct RateLimitConfig {
1426 pub capacity: u32,
1427 pub refill_tokens: u32,
1428 pub refill_interval: StdDuration,
1429}
1430
1431impl Default for RateLimitConfig {
1432 fn default() -> Self {
1433 Self {
1434 capacity: 60,
1435 refill_tokens: 1,
1436 refill_interval: StdDuration::from_secs(1),
1437 }
1438 }
1439}
1440
1441#[derive(Clone, Debug)]
1442struct TokenBucket {
1443 tokens: f64,
1444 last_refill: ClockInstant,
1445}
1446
1447impl TokenBucket {
1448 fn full(config: RateLimitConfig) -> Self {
1449 Self {
1450 tokens: config.capacity as f64,
1451 last_refill: clock::instant_now(),
1452 }
1453 }
1454
1455 fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
1456 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1457 let rate = config.refill_tokens.max(1) as f64 / interval;
1458 let elapsed = now.duration_since(self.last_refill).as_secs_f64();
1459 self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
1460 self.last_refill = now;
1461 }
1462
1463 fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
1464 self.refill(config, now);
1465 if self.tokens >= 1.0 {
1466 self.tokens -= 1.0;
1467 true
1468 } else {
1469 false
1470 }
1471 }
1472
1473 fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
1474 if self.tokens >= 1.0 {
1475 return StdDuration::ZERO;
1476 }
1477 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1478 let rate = config.refill_tokens.max(1) as f64 / interval;
1479 let missing = (1.0 - self.tokens).max(0.0);
1480 StdDuration::from_secs_f64((missing / rate).max(0.001))
1481 }
1482}
1483
1484#[derive(Debug)]
1486pub struct RateLimiterFactory {
1487 config: RateLimitConfig,
1488 buckets: Mutex<HashMap<(String, String), TokenBucket>>,
1489}
1490
1491impl RateLimiterFactory {
1492 pub fn new(config: RateLimitConfig) -> Self {
1493 Self {
1494 config,
1495 buckets: Mutex::new(HashMap::new()),
1496 }
1497 }
1498
1499 pub fn config(&self) -> RateLimitConfig {
1500 self.config
1501 }
1502
1503 pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
1504 ScopedRateLimiter {
1505 factory: self,
1506 provider: provider.clone(),
1507 key: key.into(),
1508 }
1509 }
1510
1511 pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
1512 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1513 let bucket = buckets
1514 .entry((provider.as_str().to_string(), key.to_string()))
1515 .or_insert_with(|| TokenBucket::full(self.config));
1516 bucket.try_acquire(self.config, clock::instant_now())
1517 }
1518
1519 pub async fn acquire(&self, provider: &ProviderId, key: &str) {
1520 loop {
1521 let wait = {
1522 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1523 let bucket = buckets
1524 .entry((provider.as_str().to_string(), key.to_string()))
1525 .or_insert_with(|| TokenBucket::full(self.config));
1526 if bucket.try_acquire(self.config, clock::instant_now()) {
1527 return;
1528 }
1529 bucket.wait_duration(self.config)
1530 };
1531 tokio::time::sleep(wait).await;
1532 }
1533 }
1534}
1535
1536impl Default for RateLimiterFactory {
1537 fn default() -> Self {
1538 Self::new(RateLimitConfig::default())
1539 }
1540}
1541
1542#[derive(Clone, Debug)]
1544pub struct ScopedRateLimiter<'a> {
1545 factory: &'a RateLimiterFactory,
1546 provider: ProviderId,
1547 key: String,
1548}
1549
1550impl<'a> ScopedRateLimiter<'a> {
1551 pub fn try_acquire(&self) -> bool {
1552 self.factory.try_acquire(&self.provider, &self.key)
1553 }
1554
1555 pub async fn acquire(&self) {
1556 self.factory.acquire(&self.provider, &self.key).await;
1557 }
1558}
1559
1560pub struct ConnectorRegistry {
1562 connectors: BTreeMap<ProviderId, ConnectorHandle>,
1563}
1564
1565impl ConnectorRegistry {
1566 pub fn empty() -> Self {
1567 Self {
1568 connectors: BTreeMap::new(),
1569 }
1570 }
1571
1572 pub fn with_defaults() -> Self {
1573 let mut registry = Self::empty();
1574 for provider in registered_provider_metadata() {
1575 registry
1576 .register(default_connector_for_provider(&provider))
1577 .expect("default connector registration should not fail");
1578 }
1579 registry
1580 }
1581
1582 pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
1583 let provider = connector.provider_id().clone();
1584 if self.connectors.contains_key(&provider) {
1585 return Err(ConnectorError::DuplicateProvider(provider.0));
1586 }
1587 self.connectors
1588 .insert(provider, Arc::new(AsyncMutex::new(connector)));
1589 Ok(())
1590 }
1591
1592 pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
1593 self.connectors.get(id).cloned()
1594 }
1595
1596 pub fn remove(&mut self, id: &ProviderId) -> Option<ConnectorHandle> {
1597 self.connectors.remove(id)
1598 }
1599
1600 pub fn list(&self) -> Vec<ProviderId> {
1601 self.connectors.keys().cloned().collect()
1602 }
1603
1604 pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1605 for connector in self.connectors.values() {
1606 connector.lock().await.init(ctx.clone()).await?;
1607 }
1608 Ok(())
1609 }
1610
1611 pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
1612 let mut clients = BTreeMap::new();
1613 for (provider, connector) in &self.connectors {
1614 let client = connector.lock().await.client();
1615 clients.insert(provider.clone(), client);
1616 }
1617 clients
1618 }
1619
1620 pub async fn activate_all(
1621 &self,
1622 registry: &TriggerRegistry,
1623 ) -> Result<Vec<ActivationHandle>, ConnectorError> {
1624 let mut handles = Vec::new();
1625 for (provider, connector) in &self.connectors {
1626 let bindings = registry.bindings_for(provider);
1627 if bindings.is_empty() {
1628 continue;
1629 }
1630 let connector = connector.lock().await;
1631 handles.push(connector.activate(bindings).await?);
1632 }
1633 Ok(handles)
1634 }
1635}
1636
1637impl Default for ConnectorRegistry {
1638 fn default() -> Self {
1639 Self::with_defaults()
1640 }
1641}
1642
1643fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
1644 if provider.provider == "github" {
1654 return Box::new(GitHubConnector::new());
1655 }
1656 if provider.provider == "linear" {
1657 return Box::new(LinearConnector::new());
1658 }
1659 if provider.provider == "slack" {
1660 return Box::new(SlackConnector::new());
1661 }
1662 if provider.provider == "notion" {
1663 return Box::new(NotionConnector::new());
1664 }
1665 if provider.provider == "a2a-push" {
1666 return Box::new(A2aPushConnector::new());
1667 }
1668 match &provider.runtime {
1669 ProviderRuntimeMetadata::Builtin {
1670 connector,
1671 default_signature_variant,
1672 } => match connector.as_str() {
1673 "cron" => Box::new(CronConnector::new()),
1674 "stream" => Box::new(StreamConnector::new(
1675 ProviderId::from(provider.provider.clone()),
1676 provider.schema_name.clone(),
1677 )),
1678 "webhook" => {
1679 let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
1680 .expect("catalog webhook signature variant must be valid");
1681 Box::new(GenericWebhookConnector::with_profile(
1682 WebhookProviderProfile::new(
1683 ProviderId::from(provider.provider.clone()),
1684 provider.schema_name.clone(),
1685 variant,
1686 ),
1687 ))
1688 }
1689 _ => Box::new(PlaceholderConnector::from_metadata(provider)),
1690 },
1691 ProviderRuntimeMetadata::Placeholder => {
1692 Box::new(PlaceholderConnector::from_metadata(provider))
1693 }
1694 }
1695}
1696
1697struct PlaceholderConnector {
1698 provider_id: ProviderId,
1699 kinds: Vec<TriggerKind>,
1700 schema_name: String,
1701}
1702
1703impl PlaceholderConnector {
1704 fn from_metadata(metadata: &ProviderMetadata) -> Self {
1705 Self {
1706 provider_id: ProviderId::from(metadata.provider.clone()),
1707 kinds: metadata
1708 .kinds
1709 .iter()
1710 .cloned()
1711 .map(TriggerKind::from)
1712 .collect(),
1713 schema_name: metadata.schema_name.clone(),
1714 }
1715 }
1716}
1717
1718struct PlaceholderClient;
1719
1720#[async_trait]
1721impl ConnectorClient for PlaceholderClient {
1722 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1723 Err(ClientError::Other(format!(
1724 "connector client method '{method}' is not implemented for this provider"
1725 )))
1726 }
1727}
1728
1729#[async_trait]
1730impl Connector for PlaceholderConnector {
1731 fn provider_id(&self) -> &ProviderId {
1732 &self.provider_id
1733 }
1734
1735 fn kinds(&self) -> &[TriggerKind] {
1736 &self.kinds
1737 }
1738
1739 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1740 Ok(())
1741 }
1742
1743 async fn activate(
1744 &self,
1745 bindings: &[TriggerBinding],
1746 ) -> Result<ActivationHandle, ConnectorError> {
1747 Ok(ActivationHandle::new(
1748 self.provider_id.clone(),
1749 bindings.len(),
1750 ))
1751 }
1752
1753 async fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
1754 Err(ConnectorError::Unsupported(format!(
1755 "provider '{}' is cataloged but does not have a concrete inbound connector yet",
1756 self.provider_id.as_str()
1757 )))
1758 }
1759
1760 fn payload_schema(&self) -> ProviderPayloadSchema {
1761 ProviderPayloadSchema::named(self.schema_name.clone())
1762 }
1763
1764 fn client(&self) -> Arc<dyn ConnectorClient> {
1765 Arc::new(PlaceholderClient)
1766 }
1767}
1768
1769pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
1770 ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
1771 *slot.borrow_mut() = clients
1772 .into_iter()
1773 .map(|(provider, client)| (provider.as_str().to_string(), client))
1774 .collect();
1775 });
1776}
1777
1778pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
1779 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
1780}
1781
1782pub fn clear_active_connector_clients() {
1783 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
1784}
1785
1786#[cfg(test)]
1787mod tests {
1788 use super::*;
1789
1790 use std::sync::atomic::{AtomicUsize, Ordering};
1791
1792 use async_trait::async_trait;
1793 use serde_json::json;
1794
1795 struct NoopClient;
1796
1797 #[async_trait]
1798 impl ConnectorClient for NoopClient {
1799 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1800 Ok(json!({ "method": method }))
1801 }
1802 }
1803
1804 struct FakeConnector {
1805 provider_id: ProviderId,
1806 kinds: Vec<TriggerKind>,
1807 activate_calls: Arc<AtomicUsize>,
1808 }
1809
1810 impl FakeConnector {
1811 fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
1812 Self {
1813 provider_id: ProviderId::from(provider_id),
1814 kinds: vec![TriggerKind::from("webhook")],
1815 activate_calls,
1816 }
1817 }
1818 }
1819
1820 #[async_trait]
1821 impl Connector for FakeConnector {
1822 fn provider_id(&self) -> &ProviderId {
1823 &self.provider_id
1824 }
1825
1826 fn kinds(&self) -> &[TriggerKind] {
1827 &self.kinds
1828 }
1829
1830 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1831 Ok(())
1832 }
1833
1834 async fn activate(
1835 &self,
1836 bindings: &[TriggerBinding],
1837 ) -> Result<ActivationHandle, ConnectorError> {
1838 self.activate_calls.fetch_add(1, Ordering::SeqCst);
1839 Ok(ActivationHandle::new(
1840 self.provider_id.clone(),
1841 bindings.len(),
1842 ))
1843 }
1844
1845 async fn normalize_inbound(
1846 &self,
1847 _raw: RawInbound,
1848 ) -> Result<TriggerEvent, ConnectorError> {
1849 Err(ConnectorError::Unsupported(
1850 "not needed for registry tests".to_string(),
1851 ))
1852 }
1853
1854 fn payload_schema(&self) -> ProviderPayloadSchema {
1855 ProviderPayloadSchema::named("FakePayload")
1856 }
1857
1858 fn client(&self) -> Arc<dyn ConnectorClient> {
1859 Arc::new(NoopClient)
1860 }
1861 }
1862
1863 #[tokio::test]
1864 async fn connector_registry_rejects_duplicate_providers() {
1865 let activate_calls = Arc::new(AtomicUsize::new(0));
1866 let mut registry = ConnectorRegistry::empty();
1867 registry
1868 .register(Box::new(FakeConnector::new(
1869 "github",
1870 activate_calls.clone(),
1871 )))
1872 .unwrap();
1873
1874 let error = registry
1875 .register(Box::new(FakeConnector::new("github", activate_calls)))
1876 .unwrap_err();
1877 assert!(matches!(
1878 error,
1879 ConnectorError::DuplicateProvider(provider) if provider == "github"
1880 ));
1881 }
1882
1883 #[tokio::test]
1884 async fn connector_registry_activates_only_bound_connectors() {
1885 let github_calls = Arc::new(AtomicUsize::new(0));
1886 let slack_calls = Arc::new(AtomicUsize::new(0));
1887 let mut registry = ConnectorRegistry::empty();
1888 registry
1889 .register(Box::new(FakeConnector::new("github", github_calls.clone())))
1890 .unwrap();
1891 registry
1892 .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
1893 .unwrap();
1894
1895 let mut trigger_registry = TriggerRegistry::default();
1896 trigger_registry.register(TriggerBinding::new(
1897 ProviderId::from("github"),
1898 "webhook",
1899 "github.push",
1900 ));
1901 trigger_registry.register(TriggerBinding::new(
1902 ProviderId::from("github"),
1903 "webhook",
1904 "github.installation",
1905 ));
1906
1907 let handles = registry.activate_all(&trigger_registry).await.unwrap();
1908 assert_eq!(handles.len(), 1);
1909 assert_eq!(handles[0].provider.as_str(), "github");
1910 assert_eq!(handles[0].binding_count, 2);
1911 assert_eq!(github_calls.load(Ordering::SeqCst), 1);
1912 assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
1913 }
1914
1915 #[test]
1916 fn rate_limiter_scopes_tokens_by_provider_and_key() {
1917 let factory = RateLimiterFactory::new(RateLimitConfig {
1918 capacity: 1,
1919 refill_tokens: 1,
1920 refill_interval: StdDuration::from_secs(60),
1921 });
1922
1923 assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
1924 assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
1925 assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
1926 assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
1927 }
1928
1929 #[test]
1930 fn raw_inbound_json_body_preserves_raw_bytes() {
1931 let raw = RawInbound::new(
1932 "push",
1933 BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
1934 br#"{"ok":true}"#.to_vec(),
1935 );
1936
1937 assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
1938 }
1939
1940 #[test]
1941 fn connector_registry_lists_catalog_providers() {
1942 let registry = ConnectorRegistry::default();
1943 let providers = registry.list();
1944 assert!(providers.contains(&ProviderId::from("cron")));
1945 assert!(providers.contains(&ProviderId::from("github")));
1946 assert!(providers.contains(&ProviderId::from("webhook")));
1947 }
1948
1949 #[test]
1950 fn metrics_registry_exports_orchestrator_metric_families() {
1951 let metrics = MetricsRegistry::default();
1952 metrics.record_http_request(
1953 "/triggers/github",
1954 "POST",
1955 200,
1956 StdDuration::from_millis(25),
1957 512,
1958 );
1959 metrics.record_trigger_received("github-new-issue", "github");
1960 metrics.record_trigger_deduped("github-new-issue", "inbox_duplicate");
1961 metrics.record_trigger_predicate_evaluation("github-new-issue", true, 0.002);
1962 metrics.record_trigger_dispatched("github-new-issue", "local", "succeeded");
1963 metrics.record_trigger_retry("github-new-issue", 2);
1964 metrics.record_trigger_dlq("github-new-issue", "retry_exhausted");
1965 metrics.set_trigger_inflight("github-new-issue", 0);
1966 metrics.record_event_log_append(
1967 "orchestrator.triggers.pending",
1968 StdDuration::from_millis(1),
1969 2048,
1970 );
1971 metrics.set_event_log_consumer_lag("orchestrator.triggers.pending", "orchestrator-pump", 0);
1972 metrics.set_trigger_budget_cost_today("github-new-issue", 0.002);
1973 metrics.record_trigger_budget_exhausted("github-new-issue", "daily_budget_exceeded");
1974 metrics.record_a2a_hop("agent.example", "succeeded", StdDuration::from_millis(10));
1975 metrics.set_worker_queue_depth("triage", 1);
1976 metrics.record_worker_queue_claim_age("triage", 3.0);
1977 metrics.set_orchestrator_pump_backlog("trigger.inbox.envelopes", 2);
1978 metrics.set_orchestrator_pump_outstanding("trigger.inbox.envelopes", 1);
1979 metrics.record_orchestrator_pump_admission_delay(
1980 "trigger.inbox.envelopes",
1981 StdDuration::from_millis(50),
1982 );
1983 metrics.record_trigger_accepted_to_normalized(
1984 "github-new-issue",
1985 "github-new-issue@v7",
1986 "github",
1987 Some("tenant-a"),
1988 "normalized",
1989 StdDuration::from_millis(25),
1990 );
1991 metrics.record_trigger_accepted_to_queue_append(
1992 "github-new-issue",
1993 "github-new-issue@v7",
1994 "github",
1995 Some("tenant-a"),
1996 "queued",
1997 StdDuration::from_millis(40),
1998 );
1999 metrics.record_trigger_queue_age_at_dispatch_admission(
2000 "github-new-issue",
2001 "github-new-issue@v7",
2002 "github",
2003 Some("tenant-a"),
2004 "admitted",
2005 StdDuration::from_millis(75),
2006 );
2007 metrics.record_trigger_queue_age_at_dispatch_start(
2008 "github-new-issue",
2009 "github-new-issue@v7",
2010 "github",
2011 Some("tenant-a"),
2012 "started",
2013 StdDuration::from_millis(125),
2014 );
2015 metrics.record_trigger_dispatch_runtime(
2016 "github-new-issue",
2017 "github-new-issue@v7",
2018 "github",
2019 Some("tenant-a"),
2020 "succeeded",
2021 StdDuration::from_millis(250),
2022 );
2023 metrics.record_trigger_retry_delay(
2024 "github-new-issue",
2025 "github-new-issue@v7",
2026 "github",
2027 Some("tenant-a"),
2028 "scheduled",
2029 StdDuration::from_secs(2),
2030 );
2031 metrics.record_trigger_accepted_to_dlq(
2032 "github-new-issue",
2033 "github-new-issue@v7",
2034 "github",
2035 Some("tenant-a"),
2036 "retry_exhausted",
2037 StdDuration::from_secs(45),
2038 );
2039 metrics.record_backpressure_event("ingest", "reject");
2040 metrics.note_trigger_pending_event(
2041 "evt-1",
2042 "github-new-issue",
2043 "github-new-issue@v7",
2044 "github",
2045 Some("tenant-a"),
2046 1_000,
2047 4_000,
2048 );
2049 metrics.record_llm_call("mock", "mock", "succeeded", 0.01);
2050 metrics.record_llm_cache_hit("mock");
2051
2052 let rendered = metrics.render_prometheus();
2053 for needle in [
2054 "harn_http_requests_total{endpoint=\"/triggers/github\",method=\"POST\",status=\"200\"} 1",
2055 "harn_http_request_duration_seconds_bucket{endpoint=\"/triggers/github\",le=\"0.05\"} 1",
2056 "harn_http_body_size_bytes_bucket{endpoint=\"/triggers/github\",le=\"512\"} 1",
2057 "harn_trigger_received_total{provider=\"github\",trigger_id=\"github-new-issue\"} 1",
2058 "harn_trigger_deduped_total{reason=\"inbox_duplicate\",trigger_id=\"github-new-issue\"} 1",
2059 "harn_trigger_predicate_evaluations_total{result=\"true\",trigger_id=\"github-new-issue\"} 1",
2060 "harn_trigger_predicate_cost_usd_bucket{le=\"0.01\",trigger_id=\"github-new-issue\"} 1",
2061 "harn_trigger_dispatched_total{handler_kind=\"local\",outcome=\"succeeded\",trigger_id=\"github-new-issue\"} 1",
2062 "harn_trigger_retries_total{attempt=\"2\",trigger_id=\"github-new-issue\"} 1",
2063 "harn_trigger_dlq_total{reason=\"retry_exhausted\",trigger_id=\"github-new-issue\"} 1",
2064 "harn_trigger_inflight{trigger_id=\"github-new-issue\"} 0",
2065 "harn_event_log_append_duration_seconds_bucket{le=\"0.005\",topic=\"orchestrator.triggers.pending\"} 1",
2066 "harn_event_log_topic_size_bytes{topic=\"orchestrator.triggers.pending\"} 2048",
2067 "harn_event_log_consumer_lag{consumer=\"orchestrator-pump\",topic=\"orchestrator.triggers.pending\"} 0",
2068 "harn_trigger_budget_cost_today_usd{trigger_id=\"github-new-issue\"} 0.002",
2069 "harn_trigger_budget_exhausted_total{strategy=\"daily_budget_exceeded\",trigger_id=\"github-new-issue\"} 1",
2070 "harn_backpressure_events_total{action=\"reject\",dimension=\"ingest\"} 1",
2071 "harn_a2a_hops_total{outcome=\"succeeded\",target=\"agent.example\"} 1",
2072 "harn_a2a_hop_duration_seconds_bucket{le=\"0.01\",target=\"agent.example\"} 1",
2073 "harn_worker_queue_depth{queue=\"triage\"} 1",
2074 "harn_worker_queue_claim_age_seconds_bucket{le=\"5\",queue=\"triage\"} 1",
2075 "harn_orchestrator_pump_backlog{topic=\"trigger.inbox.envelopes\"} 2",
2076 "harn_orchestrator_pump_outstanding{topic=\"trigger.inbox.envelopes\"} 1",
2077 "harn_orchestrator_pump_admission_delay_seconds_bucket{le=\"0.05\",topic=\"trigger.inbox.envelopes\"} 1",
2078 "harn_trigger_webhook_accepted_to_normalized_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.025\",provider=\"github\",status=\"normalized\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2079 "harn_trigger_webhook_accepted_to_queue_append_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.05\",provider=\"github\",status=\"queued\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2080 "harn_trigger_queue_age_at_dispatch_admission_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.1\",provider=\"github\",status=\"admitted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2081 "harn_trigger_queue_age_at_dispatch_start_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"started\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2082 "harn_trigger_dispatch_runtime_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"succeeded\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2083 "harn_trigger_retry_delay_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"2.5\",provider=\"github\",status=\"scheduled\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2084 "harn_trigger_accepted_to_dlq_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"60\",provider=\"github\",status=\"retry_exhausted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2085 "harn_trigger_oldest_pending_age_seconds{binding_key=\"github-new-issue@v7\",provider=\"github\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 3",
2086 "harn_llm_calls_total{model=\"mock\",outcome=\"succeeded\",provider=\"mock\"} 1",
2087 "harn_llm_cost_usd_total{model=\"mock\",provider=\"mock\"} 0.01",
2088 "harn_llm_cache_hits_total{provider=\"mock\"} 1",
2089 ] {
2090 assert!(rendered.contains(needle), "missing {needle}\n{rendered}");
2091 }
2092 }
2093}