1use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex, OnceLock};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25 registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26 ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod a2a_push;
30pub mod cron;
31pub mod effect_policy;
32pub mod github;
33pub mod harn_module;
34pub mod hmac;
35pub mod linear;
36pub mod notion;
37pub mod slack;
38pub mod stream;
39#[cfg(test)]
40pub(crate) mod test_util;
41pub mod testkit;
42pub mod webhook;
43
44pub use a2a_push::A2aPushConnector;
45pub use cron::{CatchupMode, CronConnector};
46pub use effect_policy::{
47 connector_export_denied_builtin_reason, connector_export_effect_class,
48 default_connector_export_policy, ConnectorExportEffectClass, HarnConnectorEffectPolicies,
49};
50pub use github::GitHubConnector;
51pub use harn_module::{
52 load_contract as load_harn_connector_contract, HarnConnector, HarnConnectorContract,
53};
54pub use hmac::{
55 verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
56 DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
57 DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
58 DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
59 DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
60 DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
61 SIGNATURE_VERIFY_AUDIT_TOPIC,
62};
63pub use linear::LinearConnector;
64pub use notion::{
65 load_pending_webhook_handshakes, NotionConnector, PersistedNotionWebhookHandshake,
66};
67pub use slack::SlackConnector;
68pub use stream::StreamConnector;
69use webhook::WebhookProviderProfile;
70pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
71
72pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
74
75thread_local! {
76 static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
77 RefCell::new(BTreeMap::new());
78}
79
80#[async_trait]
82pub trait Connector: Send + Sync {
83 fn provider_id(&self) -> &ProviderId;
85
86 fn kinds(&self) -> &[TriggerKind];
88
89 async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
91
92 async fn activate(
94 &self,
95 bindings: &[TriggerBinding],
96 ) -> Result<ActivationHandle, ConnectorError>;
97
98 async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
100 Ok(())
101 }
102
103 async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
105
106 async fn normalize_inbound_result(
109 &self,
110 raw: RawInbound,
111 ) -> Result<ConnectorNormalizeResult, ConnectorError> {
112 self.normalize_inbound(raw)
113 .await
114 .map(ConnectorNormalizeResult::event)
115 }
116
117 fn payload_schema(&self) -> ProviderPayloadSchema;
119
120 fn client(&self) -> Arc<dyn ConnectorClient>;
122}
123
124#[derive(Clone, Debug, PartialEq)]
126pub struct ConnectorHttpResponse {
127 pub status: u16,
128 pub headers: BTreeMap<String, String>,
129 pub body: JsonValue,
130}
131
132impl ConnectorHttpResponse {
133 pub fn new(status: u16, headers: BTreeMap<String, String>, body: JsonValue) -> Self {
134 Self {
135 status,
136 headers,
137 body,
138 }
139 }
140}
141
142#[derive(Clone, Debug, PartialEq)]
144pub enum ConnectorNormalizeResult {
145 Event(Box<TriggerEvent>),
146 Batch(Vec<TriggerEvent>),
147 ImmediateResponse {
148 response: ConnectorHttpResponse,
149 events: Vec<TriggerEvent>,
150 },
151 Reject(ConnectorHttpResponse),
152}
153
154impl ConnectorNormalizeResult {
155 pub fn event(event: TriggerEvent) -> Self {
156 Self::Event(Box::new(event))
157 }
158
159 pub fn into_events(self) -> Vec<TriggerEvent> {
160 match self {
161 Self::Event(event) => vec![*event],
162 Self::Batch(events) | Self::ImmediateResponse { events, .. } => events,
163 Self::Reject(_) => Vec::new(),
164 }
165 }
166}
167
168#[derive(Clone, Debug, PartialEq)]
169pub enum PostNormalizeOutcome {
170 Ready(Box<TriggerEvent>),
171 DuplicateDropped,
172}
173
174pub async fn postprocess_normalized_event(
175 inbox: &InboxIndex,
176 binding_id: &str,
177 dedupe_enabled: bool,
178 dedupe_ttl: StdDuration,
179 mut event: TriggerEvent,
180) -> Result<PostNormalizeOutcome, ConnectorError> {
181 if dedupe_enabled && !event.dedupe_claimed() {
182 if !inbox
183 .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
184 .await?
185 {
186 return Ok(PostNormalizeOutcome::DuplicateDropped);
187 }
188 event.mark_dedupe_claimed();
189 }
190
191 Ok(PostNormalizeOutcome::Ready(Box::new(event)))
192}
193
194#[async_trait]
196pub trait ConnectorClient: Send + Sync {
197 async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
198}
199
200#[derive(Clone, Debug, PartialEq, Eq)]
202pub enum ClientError {
203 MethodNotFound(String),
204 InvalidArgs(String),
205 RateLimited(String),
206 Transport(String),
207 Other(String),
208}
209
210impl fmt::Display for ClientError {
211 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212 match self {
213 Self::MethodNotFound(message)
214 | Self::InvalidArgs(message)
215 | Self::RateLimited(message)
216 | Self::Transport(message)
217 | Self::Other(message) => message.fmt(f),
218 }
219 }
220}
221
222impl std::error::Error for ClientError {}
223
224#[derive(Debug)]
226pub enum ConnectorError {
227 DuplicateProvider(String),
228 DuplicateDelivery(String),
229 UnknownProvider(String),
230 MissingHeader(String),
231 InvalidHeader {
232 name: String,
233 detail: String,
234 },
235 InvalidSignature(String),
236 TimestampOutOfWindow {
237 timestamp: OffsetDateTime,
238 now: OffsetDateTime,
239 window: time::Duration,
240 },
241 Json(String),
242 Secret(String),
243 EventLog(String),
244 HarnRuntime(String),
245 Client(ClientError),
246 Unsupported(String),
247 Activation(String),
248}
249
250impl ConnectorError {
251 pub fn invalid_signature(message: impl Into<String>) -> Self {
252 Self::InvalidSignature(message.into())
253 }
254}
255
256impl fmt::Display for ConnectorError {
257 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258 match self {
259 Self::DuplicateProvider(provider) => {
260 write!(f, "connector provider `{provider}` is already registered")
261 }
262 Self::DuplicateDelivery(message) => message.fmt(f),
263 Self::UnknownProvider(provider) => {
264 write!(f, "connector provider `{provider}` is not registered")
265 }
266 Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
267 Self::InvalidHeader { name, detail } => {
268 write!(f, "invalid header `{name}`: {detail}")
269 }
270 Self::InvalidSignature(message)
271 | Self::Json(message)
272 | Self::Secret(message)
273 | Self::EventLog(message)
274 | Self::HarnRuntime(message)
275 | Self::Unsupported(message)
276 | Self::Activation(message) => message.fmt(f),
277 Self::TimestampOutOfWindow {
278 timestamp,
279 now,
280 window,
281 } => write!(
282 f,
283 "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
284 ),
285 Self::Client(error) => error.fmt(f),
286 }
287 }
288}
289
290impl std::error::Error for ConnectorError {}
291
292impl From<crate::event_log::LogError> for ConnectorError {
293 fn from(value: crate::event_log::LogError) -> Self {
294 Self::EventLog(value.to_string())
295 }
296}
297
298impl From<crate::secrets::SecretError> for ConnectorError {
299 fn from(value: crate::secrets::SecretError) -> Self {
300 Self::Secret(value.to_string())
301 }
302}
303
304impl From<serde_json::Error> for ConnectorError {
305 fn from(value: serde_json::Error) -> Self {
306 Self::Json(value.to_string())
307 }
308}
309
310impl From<ClientError> for ConnectorError {
311 fn from(value: ClientError) -> Self {
312 Self::Client(value)
313 }
314}
315
316#[derive(Clone)]
318pub struct ConnectorCtx {
319 pub event_log: Arc<AnyEventLog>,
320 pub secrets: Arc<dyn SecretProvider>,
321 pub inbox: Arc<InboxIndex>,
322 pub metrics: Arc<MetricsRegistry>,
323 pub rate_limiter: Arc<RateLimiterFactory>,
324}
325
326#[derive(Clone, Debug, Default, PartialEq, Eq)]
328pub struct ConnectorMetricsSnapshot {
329 pub inbox_claims_written: u64,
330 pub inbox_duplicates_rejected: u64,
331 pub inbox_fast_path_hits: u64,
332 pub inbox_durable_hits: u64,
333 pub inbox_expired_entries: u64,
334 pub inbox_active_entries: u64,
335 pub linear_timestamp_rejections_total: u64,
336 pub dispatch_succeeded_total: u64,
337 pub dispatch_failed_total: u64,
338 pub retry_scheduled_total: u64,
339 pub slack_delivery_success_total: u64,
340 pub slack_delivery_failure_total: u64,
341}
342
343type MetricLabels = BTreeMap<String, String>;
344
345#[derive(Clone, Debug, Default, PartialEq)]
346struct HistogramMetric {
347 buckets: BTreeMap<String, u64>,
348 count: u64,
349 sum: f64,
350}
351
352static ACTIVE_METRICS_REGISTRY: OnceLock<Mutex<Option<Arc<MetricsRegistry>>>> = OnceLock::new();
353
354pub fn install_active_metrics_registry(metrics: Arc<MetricsRegistry>) {
355 let slot = ACTIVE_METRICS_REGISTRY.get_or_init(|| Mutex::new(None));
356 *slot.lock().expect("active metrics registry poisoned") = Some(metrics);
357}
358
359pub fn clear_active_metrics_registry() {
360 if let Some(slot) = ACTIVE_METRICS_REGISTRY.get() {
361 *slot.lock().expect("active metrics registry poisoned") = None;
362 }
363}
364
365pub fn active_metrics_registry() -> Option<Arc<MetricsRegistry>> {
366 ACTIVE_METRICS_REGISTRY.get().and_then(|slot| {
367 slot.lock()
368 .expect("active metrics registry poisoned")
369 .clone()
370 })
371}
372
373#[derive(Debug, Default)]
375pub struct MetricsRegistry {
376 inbox_claims_written: AtomicU64,
377 inbox_duplicates_rejected: AtomicU64,
378 inbox_fast_path_hits: AtomicU64,
379 inbox_durable_hits: AtomicU64,
380 inbox_expired_entries: AtomicU64,
381 inbox_active_entries: AtomicU64,
382 linear_timestamp_rejections_total: AtomicU64,
383 dispatch_succeeded_total: AtomicU64,
384 dispatch_failed_total: AtomicU64,
385 retry_scheduled_total: AtomicU64,
386 slack_delivery_success_total: AtomicU64,
387 slack_delivery_failure_total: AtomicU64,
388 custom_counters: Mutex<BTreeMap<String, u64>>,
389 counters: Mutex<BTreeMap<(String, MetricLabels), f64>>,
390 gauges: Mutex<BTreeMap<(String, MetricLabels), f64>>,
391 histograms: Mutex<BTreeMap<(String, MetricLabels), HistogramMetric>>,
392 pending_trigger_events: Mutex<BTreeMap<MetricLabels, BTreeMap<String, i64>>>,
393}
394
395impl MetricsRegistry {
396 const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0];
397 const TRIGGER_LATENCY_BUCKETS: [f64; 15] = [
398 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0,
399 ];
400 const SIZE_BUCKETS: [f64; 9] = [
401 128.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0, 10485760.0,
402 ];
403
404 pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
405 ConnectorMetricsSnapshot {
406 inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
407 inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
408 inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
409 inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
410 inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
411 inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
412 linear_timestamp_rejections_total: self
413 .linear_timestamp_rejections_total
414 .load(Ordering::Relaxed),
415 dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
416 dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
417 retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
418 slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
419 slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
420 }
421 }
422
423 pub(crate) fn record_inbox_claim(&self) {
424 self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
425 }
426
427 pub(crate) fn record_inbox_duplicate_fast_path(&self) {
428 self.inbox_duplicates_rejected
429 .fetch_add(1, Ordering::Relaxed);
430 self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
431 }
432
433 pub(crate) fn record_inbox_duplicate_durable(&self) {
434 self.inbox_duplicates_rejected
435 .fetch_add(1, Ordering::Relaxed);
436 self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
437 }
438
439 pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
440 if count > 0 {
441 self.inbox_expired_entries
442 .fetch_add(count, Ordering::Relaxed);
443 }
444 }
445
446 pub(crate) fn set_inbox_active_entries(&self, count: usize) {
447 self.inbox_active_entries
448 .store(count as u64, Ordering::Relaxed);
449 }
450
451 pub fn record_linear_timestamp_rejection(&self) {
452 self.linear_timestamp_rejections_total
453 .fetch_add(1, Ordering::Relaxed);
454 }
455
456 pub fn record_dispatch_succeeded(&self) {
457 self.dispatch_succeeded_total
458 .fetch_add(1, Ordering::Relaxed);
459 }
460
461 pub fn record_dispatch_failed(&self) {
462 self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
463 }
464
465 pub fn record_retry_scheduled(&self) {
466 self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
467 }
468
469 pub fn record_slack_delivery_success(&self) {
470 self.slack_delivery_success_total
471 .fetch_add(1, Ordering::Relaxed);
472 }
473
474 pub fn record_slack_delivery_failure(&self) {
475 self.slack_delivery_failure_total
476 .fetch_add(1, Ordering::Relaxed);
477 }
478
479 pub fn record_custom_counter(&self, name: &str, amount: u64) {
480 if amount == 0 {
481 return;
482 }
483 let mut counters = self
484 .custom_counters
485 .lock()
486 .expect("custom counters poisoned");
487 *counters.entry(name.to_string()).or_default() += amount;
488 }
489
490 pub fn record_http_request(
491 &self,
492 endpoint: &str,
493 method: &str,
494 status: u16,
495 duration: StdDuration,
496 body_size_bytes: usize,
497 ) {
498 self.increment_counter(
499 "harn_http_requests_total",
500 labels([
501 ("endpoint", endpoint),
502 ("method", method),
503 ("status", &status.to_string()),
504 ]),
505 1,
506 );
507 self.observe_histogram(
508 "harn_http_request_duration_seconds",
509 labels([("endpoint", endpoint)]),
510 duration.as_secs_f64(),
511 &Self::DURATION_BUCKETS,
512 );
513 self.observe_histogram(
514 "harn_http_body_size_bytes",
515 labels([("endpoint", endpoint)]),
516 body_size_bytes as f64,
517 &Self::SIZE_BUCKETS,
518 );
519 }
520
521 pub fn record_trigger_received(&self, trigger_id: &str, provider: &str) {
522 self.increment_counter(
523 "harn_trigger_received_total",
524 labels([("trigger_id", trigger_id), ("provider", provider)]),
525 1,
526 );
527 }
528
529 pub fn record_trigger_deduped(&self, trigger_id: &str, reason: &str) {
530 self.increment_counter(
531 "harn_trigger_deduped_total",
532 labels([("trigger_id", trigger_id), ("reason", reason)]),
533 1,
534 );
535 }
536
537 pub fn record_trigger_predicate_evaluation(
538 &self,
539 trigger_id: &str,
540 result: bool,
541 cost_usd: f64,
542 ) {
543 self.increment_counter(
544 "harn_trigger_predicate_evaluations_total",
545 labels([
546 ("trigger_id", trigger_id),
547 ("result", if result { "true" } else { "false" }),
548 ]),
549 1,
550 );
551 self.observe_histogram(
552 "harn_trigger_predicate_cost_usd",
553 labels([("trigger_id", trigger_id)]),
554 cost_usd.max(0.0),
555 &[0.0, 0.001, 0.01, 0.05, 0.1, 1.0],
556 );
557 }
558
559 pub fn record_trigger_dispatched(&self, trigger_id: &str, handler_kind: &str, outcome: &str) {
560 self.increment_counter(
561 "harn_trigger_dispatched_total",
562 labels([
563 ("trigger_id", trigger_id),
564 ("handler_kind", handler_kind),
565 ("outcome", outcome),
566 ]),
567 1,
568 );
569 }
570
571 pub fn record_trigger_retry(&self, trigger_id: &str, attempt: u32) {
572 self.increment_counter(
573 "harn_trigger_retries_total",
574 labels([
575 ("trigger_id", trigger_id),
576 ("attempt", &attempt.to_string()),
577 ]),
578 1,
579 );
580 }
581
582 pub fn record_trigger_dlq(&self, trigger_id: &str, reason: &str) {
583 self.increment_counter(
584 "harn_trigger_dlq_total",
585 labels([("trigger_id", trigger_id), ("reason", reason)]),
586 1,
587 );
588 }
589
590 pub fn record_trigger_accepted_to_normalized(
591 &self,
592 trigger_id: &str,
593 binding_key: &str,
594 provider: &str,
595 tenant_id: Option<&str>,
596 status: &str,
597 duration: StdDuration,
598 ) {
599 self.observe_histogram(
600 "harn_trigger_webhook_accepted_to_normalized_seconds",
601 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
602 duration.as_secs_f64(),
603 &Self::TRIGGER_LATENCY_BUCKETS,
604 );
605 }
606
607 pub fn record_trigger_accepted_to_queue_append(
608 &self,
609 trigger_id: &str,
610 binding_key: &str,
611 provider: &str,
612 tenant_id: Option<&str>,
613 status: &str,
614 duration: StdDuration,
615 ) {
616 self.observe_histogram(
617 "harn_trigger_webhook_accepted_to_queue_append_seconds",
618 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
619 duration.as_secs_f64(),
620 &Self::TRIGGER_LATENCY_BUCKETS,
621 );
622 }
623
624 pub fn record_trigger_queue_age_at_dispatch_admission(
625 &self,
626 trigger_id: &str,
627 binding_key: &str,
628 provider: &str,
629 tenant_id: Option<&str>,
630 status: &str,
631 age: StdDuration,
632 ) {
633 self.observe_histogram(
634 "harn_trigger_queue_age_at_dispatch_admission_seconds",
635 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
636 age.as_secs_f64(),
637 &Self::TRIGGER_LATENCY_BUCKETS,
638 );
639 }
640
641 pub fn record_trigger_queue_age_at_dispatch_start(
642 &self,
643 trigger_id: &str,
644 binding_key: &str,
645 provider: &str,
646 tenant_id: Option<&str>,
647 status: &str,
648 age: StdDuration,
649 ) {
650 self.observe_histogram(
651 "harn_trigger_queue_age_at_dispatch_start_seconds",
652 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
653 age.as_secs_f64(),
654 &Self::TRIGGER_LATENCY_BUCKETS,
655 );
656 }
657
658 pub fn record_trigger_dispatch_runtime(
659 &self,
660 trigger_id: &str,
661 binding_key: &str,
662 provider: &str,
663 tenant_id: Option<&str>,
664 status: &str,
665 duration: StdDuration,
666 ) {
667 self.observe_histogram(
668 "harn_trigger_dispatch_runtime_seconds",
669 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
670 duration.as_secs_f64(),
671 &Self::TRIGGER_LATENCY_BUCKETS,
672 );
673 }
674
675 pub fn record_trigger_retry_delay(
676 &self,
677 trigger_id: &str,
678 binding_key: &str,
679 provider: &str,
680 tenant_id: Option<&str>,
681 status: &str,
682 duration: StdDuration,
683 ) {
684 self.observe_histogram(
685 "harn_trigger_retry_delay_seconds",
686 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
687 duration.as_secs_f64(),
688 &Self::TRIGGER_LATENCY_BUCKETS,
689 );
690 }
691
692 pub fn record_trigger_accepted_to_dlq(
693 &self,
694 trigger_id: &str,
695 binding_key: &str,
696 provider: &str,
697 tenant_id: Option<&str>,
698 status: &str,
699 duration: StdDuration,
700 ) {
701 self.observe_histogram(
702 "harn_trigger_accepted_to_dlq_seconds",
703 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
704 duration.as_secs_f64(),
705 &Self::TRIGGER_LATENCY_BUCKETS,
706 );
707 }
708
709 pub fn note_trigger_pending_event(
710 &self,
711 event_id: &str,
712 trigger_id: &str,
713 binding_key: &str,
714 provider: &str,
715 tenant_id: Option<&str>,
716 accepted_at_ms: i64,
717 now_ms: i64,
718 ) {
719 let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
720 {
721 let mut pending = self
722 .pending_trigger_events
723 .lock()
724 .expect("pending trigger events poisoned");
725 pending
726 .entry(labels.clone())
727 .or_default()
728 .insert(event_id.to_string(), accepted_at_ms);
729 }
730 self.refresh_oldest_pending_gauge(labels, now_ms);
731 }
732
733 pub fn clear_trigger_pending_event(
734 &self,
735 event_id: &str,
736 trigger_id: &str,
737 binding_key: &str,
738 provider: &str,
739 tenant_id: Option<&str>,
740 now_ms: i64,
741 ) {
742 let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
743 {
744 let mut pending = self
745 .pending_trigger_events
746 .lock()
747 .expect("pending trigger events poisoned");
748 if let Some(events) = pending.get_mut(&labels) {
749 events.remove(event_id);
750 if events.is_empty() {
751 pending.remove(&labels);
752 }
753 }
754 }
755 self.refresh_oldest_pending_gauge(labels, now_ms);
756 }
757
758 pub fn set_trigger_inflight(&self, trigger_id: &str, count: u64) {
759 self.set_gauge(
760 "harn_trigger_inflight",
761 labels([("trigger_id", trigger_id)]),
762 count as f64,
763 );
764 }
765
766 pub fn set_trigger_budget_cost_today(&self, trigger_id: &str, cost_usd: f64) {
767 self.set_gauge(
768 "harn_trigger_budget_cost_today_usd",
769 labels([("trigger_id", trigger_id)]),
770 cost_usd.max(0.0),
771 );
772 }
773
774 pub fn record_trigger_budget_exhausted(&self, trigger_id: &str, strategy: &str) {
775 self.increment_counter(
776 "harn_trigger_budget_exhausted_total",
777 labels([("trigger_id", trigger_id), ("strategy", strategy)]),
778 1,
779 );
780 }
781
782 pub fn record_backpressure_event(&self, dimension: &str, action: &str) {
783 self.increment_counter(
784 "harn_backpressure_events_total",
785 labels([("dimension", dimension), ("action", action)]),
786 1,
787 );
788 }
789
790 pub fn record_event_log_append(
791 &self,
792 topic: &str,
793 duration: StdDuration,
794 payload_bytes: usize,
795 ) {
796 self.observe_histogram(
797 "harn_event_log_append_duration_seconds",
798 labels([("topic", topic)]),
799 duration.as_secs_f64(),
800 &Self::DURATION_BUCKETS,
801 );
802 self.set_gauge(
803 "harn_event_log_topic_size_bytes",
804 labels([("topic", topic)]),
805 payload_bytes as f64,
806 );
807 }
808
809 pub fn set_event_log_consumer_lag(&self, topic: &str, consumer: &str, lag: u64) {
810 self.set_gauge(
811 "harn_event_log_consumer_lag",
812 labels([("topic", topic), ("consumer", consumer)]),
813 lag as f64,
814 );
815 }
816
817 pub fn record_a2a_hop(&self, target: &str, outcome: &str, duration: StdDuration) {
818 self.increment_counter(
819 "harn_a2a_hops_total",
820 labels([("target", target), ("outcome", outcome)]),
821 1,
822 );
823 self.observe_histogram(
824 "harn_a2a_hop_duration_seconds",
825 labels([("target", target)]),
826 duration.as_secs_f64(),
827 &Self::DURATION_BUCKETS,
828 );
829 }
830
831 pub fn set_worker_queue_depth(&self, queue: &str, depth: u64) {
832 self.set_gauge(
833 "harn_worker_queue_depth",
834 labels([("queue", queue)]),
835 depth as f64,
836 );
837 }
838
839 pub fn record_worker_queue_claim_age(&self, queue: &str, age_seconds: f64) {
840 self.observe_histogram(
841 "harn_worker_queue_claim_age_seconds",
842 labels([("queue", queue)]),
843 age_seconds.max(0.0),
844 &Self::DURATION_BUCKETS,
845 );
846 }
847
848 pub fn set_orchestrator_pump_backlog(&self, topic: &str, count: u64) {
849 self.set_gauge(
850 "harn_orchestrator_pump_backlog",
851 labels([("topic", topic)]),
852 count as f64,
853 );
854 }
855
856 pub fn set_orchestrator_pump_outstanding(&self, topic: &str, count: usize) {
857 self.set_gauge(
858 "harn_orchestrator_pump_outstanding",
859 labels([("topic", topic)]),
860 count as f64,
861 );
862 }
863
864 pub fn record_orchestrator_pump_admission_delay(&self, topic: &str, duration: StdDuration) {
865 self.observe_histogram(
866 "harn_orchestrator_pump_admission_delay_seconds",
867 labels([("topic", topic)]),
868 duration.as_secs_f64(),
869 &Self::DURATION_BUCKETS,
870 );
871 }
872
873 pub fn record_llm_call(&self, provider: &str, model: &str, outcome: &str, cost_usd: f64) {
874 self.increment_counter(
875 "harn_llm_calls_total",
876 labels([
877 ("provider", provider),
878 ("model", model),
879 ("outcome", outcome),
880 ]),
881 1,
882 );
883 if cost_usd > 0.0 {
884 self.increment_counter(
885 "harn_llm_cost_usd_total",
886 labels([("provider", provider), ("model", model)]),
887 cost_usd,
888 );
889 } else {
890 self.ensure_counter(
891 "harn_llm_cost_usd_total",
892 labels([("provider", provider), ("model", model)]),
893 );
894 }
895 }
896
897 pub fn record_llm_cache_hit(&self, provider: &str) {
898 self.increment_counter(
899 "harn_llm_cache_hits_total",
900 labels([("provider", provider)]),
901 1,
902 );
903 }
904
905 pub fn render_prometheus(&self) -> String {
906 let snapshot = self.snapshot();
907 let counters = [
908 (
909 "connector_linear_timestamp_rejections_total",
910 snapshot.linear_timestamp_rejections_total,
911 ),
912 (
913 "dispatch_succeeded_total",
914 snapshot.dispatch_succeeded_total,
915 ),
916 ("dispatch_failed_total", snapshot.dispatch_failed_total),
917 ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
918 ("retry_scheduled_total", snapshot.retry_scheduled_total),
919 (
920 "slack_events_delivery_success_total",
921 snapshot.slack_delivery_success_total,
922 ),
923 (
924 "slack_events_delivery_failure_total",
925 snapshot.slack_delivery_failure_total,
926 ),
927 ];
928
929 let mut rendered = String::new();
930 for (name, value) in counters {
931 rendered.push_str("# TYPE ");
932 rendered.push_str(name);
933 rendered.push_str(" counter\n");
934 rendered.push_str(name);
935 rendered.push(' ');
936 rendered.push_str(&value.to_string());
937 rendered.push('\n');
938 }
939 let custom_counters = self
940 .custom_counters
941 .lock()
942 .expect("custom counters poisoned");
943 for (name, value) in custom_counters.iter() {
944 let metric_name = format!(
945 "connector_custom_{}_total",
946 name.chars()
947 .map(|ch| if ch.is_ascii_alphanumeric() || ch == '_' {
948 ch
949 } else {
950 '_'
951 })
952 .collect::<String>()
953 );
954 rendered.push_str("# TYPE ");
955 rendered.push_str(&metric_name);
956 rendered.push_str(" counter\n");
957 rendered.push_str(&metric_name);
958 rendered.push(' ');
959 rendered.push_str(&value.to_string());
960 rendered.push('\n');
961 }
962 rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
963 rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
964 rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
965 rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
966 self.render_generic_metrics(&mut rendered);
967 rendered
968 }
969
970 fn increment_counter(&self, name: &str, labels: MetricLabels, amount: impl Into<f64>) {
971 let amount = amount.into();
972 if amount <= 0.0 || !amount.is_finite() {
973 return;
974 }
975 let mut counters = self.counters.lock().expect("metrics counters poisoned");
976 *counters.entry((name.to_string(), labels)).or_default() += amount;
977 }
978
979 fn ensure_counter(&self, name: &str, labels: MetricLabels) {
980 let mut counters = self.counters.lock().expect("metrics counters poisoned");
981 counters.entry((name.to_string(), labels)).or_default();
982 }
983
984 fn set_gauge(&self, name: &str, labels: MetricLabels, value: f64) {
985 let mut gauges = self.gauges.lock().expect("metrics gauges poisoned");
986 gauges.insert((name.to_string(), labels), value);
987 }
988
989 fn observe_histogram(
990 &self,
991 name: &str,
992 labels: MetricLabels,
993 value: f64,
994 bucket_bounds: &[f64],
995 ) {
996 if !value.is_finite() {
997 return;
998 }
999 let mut histograms = self.histograms.lock().expect("metrics histograms poisoned");
1000 let histogram = histograms
1001 .entry((name.to_string(), labels))
1002 .or_insert_with(|| HistogramMetric {
1003 buckets: bucket_bounds
1004 .iter()
1005 .map(|bound| (prometheus_float(*bound), 0))
1006 .chain(std::iter::once(("+Inf".to_string(), 0)))
1007 .collect(),
1008 count: 0,
1009 sum: 0.0,
1010 });
1011 histogram.count += 1;
1012 histogram.sum += value;
1013 for bound in bucket_bounds {
1014 if value <= *bound {
1015 let key = prometheus_float(*bound);
1016 *histogram.buckets.entry(key).or_default() += 1;
1017 }
1018 }
1019 *histogram.buckets.entry("+Inf".to_string()).or_default() += 1;
1020 }
1021
1022 fn refresh_oldest_pending_gauge(&self, labels: MetricLabels, now_ms: i64) {
1023 let oldest_accepted_at_ms = self
1024 .pending_trigger_events
1025 .lock()
1026 .expect("pending trigger events poisoned")
1027 .get(&labels)
1028 .and_then(|events| events.values().min().copied());
1029 let age_seconds = oldest_accepted_at_ms
1030 .map(|accepted_at_ms| millis_delta(now_ms, accepted_at_ms).as_secs_f64())
1031 .unwrap_or(0.0);
1032 self.set_gauge(
1033 "harn_trigger_oldest_pending_age_seconds",
1034 labels,
1035 age_seconds,
1036 );
1037 }
1038
1039 fn render_generic_metrics(&self, rendered: &mut String) {
1040 let counters = self
1041 .counters
1042 .lock()
1043 .expect("metrics counters poisoned")
1044 .clone();
1045 let gauges = self.gauges.lock().expect("metrics gauges poisoned").clone();
1046 let histograms = self
1047 .histograms
1048 .lock()
1049 .expect("metrics histograms poisoned")
1050 .clone();
1051
1052 for name in metric_family_names(MetricKind::Counter) {
1053 rendered.push_str("# TYPE ");
1054 rendered.push_str(name);
1055 rendered.push_str(" counter\n");
1056 for ((sample_name, labels), value) in counters.iter().filter(|((n, _), _)| n == name) {
1057 render_sample(rendered, sample_name, labels, *value);
1058 }
1059 }
1060 for name in metric_family_names(MetricKind::Gauge) {
1061 rendered.push_str("# TYPE ");
1062 rendered.push_str(name);
1063 rendered.push_str(" gauge\n");
1064 for ((sample_name, labels), value) in gauges.iter().filter(|((n, _), _)| n == name) {
1065 render_sample(rendered, sample_name, labels, *value);
1066 }
1067 }
1068 for name in metric_family_names(MetricKind::Histogram) {
1069 rendered.push_str("# TYPE ");
1070 rendered.push_str(name);
1071 rendered.push_str(" histogram\n");
1072 for ((sample_name, labels), histogram) in
1073 histograms.iter().filter(|((n, _), _)| n == name)
1074 {
1075 for (le, value) in &histogram.buckets {
1076 let mut bucket_labels = labels.clone();
1077 bucket_labels.insert("le".to_string(), le.clone());
1078 render_sample(
1079 rendered,
1080 &format!("{sample_name}_bucket"),
1081 &bucket_labels,
1082 *value as f64,
1083 );
1084 }
1085 render_sample(
1086 rendered,
1087 &format!("{sample_name}_sum"),
1088 labels,
1089 histogram.sum,
1090 );
1091 render_sample(
1092 rendered,
1093 &format!("{sample_name}_count"),
1094 labels,
1095 histogram.count as f64,
1096 );
1097 }
1098 }
1099 }
1100}
1101
1102#[derive(Clone, Copy)]
1103enum MetricKind {
1104 Counter,
1105 Gauge,
1106 Histogram,
1107}
1108
1109fn metric_family_names(kind: MetricKind) -> &'static [&'static str] {
1110 match kind {
1111 MetricKind::Counter => &[
1112 "harn_http_requests_total",
1113 "harn_trigger_received_total",
1114 "harn_trigger_deduped_total",
1115 "harn_trigger_predicate_evaluations_total",
1116 "harn_trigger_dispatched_total",
1117 "harn_trigger_retries_total",
1118 "harn_trigger_dlq_total",
1119 "harn_trigger_budget_exhausted_total",
1120 "harn_backpressure_events_total",
1121 "harn_a2a_hops_total",
1122 "harn_llm_calls_total",
1123 "harn_llm_cost_usd_total",
1124 "harn_llm_cache_hits_total",
1125 ],
1126 MetricKind::Gauge => &[
1127 "harn_trigger_inflight",
1128 "harn_event_log_topic_size_bytes",
1129 "harn_event_log_consumer_lag",
1130 "harn_trigger_budget_cost_today_usd",
1131 "harn_worker_queue_depth",
1132 "harn_orchestrator_pump_backlog",
1133 "harn_orchestrator_pump_outstanding",
1134 "harn_trigger_oldest_pending_age_seconds",
1135 ],
1136 MetricKind::Histogram => &[
1137 "harn_http_request_duration_seconds",
1138 "harn_http_body_size_bytes",
1139 "harn_trigger_predicate_cost_usd",
1140 "harn_event_log_append_duration_seconds",
1141 "harn_a2a_hop_duration_seconds",
1142 "harn_worker_queue_claim_age_seconds",
1143 "harn_orchestrator_pump_admission_delay_seconds",
1144 "harn_trigger_webhook_accepted_to_normalized_seconds",
1145 "harn_trigger_webhook_accepted_to_queue_append_seconds",
1146 "harn_trigger_queue_age_at_dispatch_admission_seconds",
1147 "harn_trigger_queue_age_at_dispatch_start_seconds",
1148 "harn_trigger_dispatch_runtime_seconds",
1149 "harn_trigger_retry_delay_seconds",
1150 "harn_trigger_accepted_to_dlq_seconds",
1151 ],
1152 }
1153}
1154
1155fn labels<const N: usize>(pairs: [(&str, &str); N]) -> MetricLabels {
1156 pairs
1157 .into_iter()
1158 .map(|(name, value)| (name.to_string(), value.to_string()))
1159 .collect()
1160}
1161
1162fn trigger_lifecycle_labels(
1163 trigger_id: &str,
1164 binding_key: &str,
1165 provider: &str,
1166 tenant_id: Option<&str>,
1167 status: &str,
1168) -> MetricLabels {
1169 labels([
1170 ("binding_key", binding_key),
1171 ("provider", provider),
1172 ("status", status),
1173 ("tenant_id", tenant_label(tenant_id)),
1174 ("trigger_id", trigger_id),
1175 ])
1176}
1177
1178fn trigger_pending_labels(
1179 trigger_id: &str,
1180 binding_key: &str,
1181 provider: &str,
1182 tenant_id: Option<&str>,
1183) -> MetricLabels {
1184 labels([
1185 ("binding_key", binding_key),
1186 ("provider", provider),
1187 ("tenant_id", tenant_label(tenant_id)),
1188 ("trigger_id", trigger_id),
1189 ])
1190}
1191
1192fn tenant_label(tenant_id: Option<&str>) -> &str {
1193 tenant_id
1194 .map(str::trim)
1195 .filter(|value| !value.is_empty())
1196 .unwrap_or("none")
1197}
1198
1199fn millis_delta(later_ms: i64, earlier_ms: i64) -> StdDuration {
1200 StdDuration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
1201}
1202
1203fn render_sample(rendered: &mut String, name: &str, labels: &MetricLabels, value: f64) {
1204 rendered.push_str(name);
1205 if !labels.is_empty() {
1206 rendered.push('{');
1207 for (index, (label, label_value)) in labels.iter().enumerate() {
1208 if index > 0 {
1209 rendered.push(',');
1210 }
1211 rendered.push_str(label);
1212 rendered.push_str("=\"");
1213 rendered.push_str(&escape_label_value(label_value));
1214 rendered.push('"');
1215 }
1216 rendered.push('}');
1217 }
1218 rendered.push(' ');
1219 rendered.push_str(&prometheus_float(value));
1220 rendered.push('\n');
1221}
1222
1223fn escape_label_value(value: &str) -> String {
1224 value
1225 .chars()
1226 .flat_map(|ch| match ch {
1227 '\\' => "\\\\".chars().collect::<Vec<_>>(),
1228 '"' => "\\\"".chars().collect::<Vec<_>>(),
1229 '\n' => "\\n".chars().collect::<Vec<_>>(),
1230 other => vec![other],
1231 })
1232 .collect()
1233}
1234
1235fn prometheus_float(value: f64) -> String {
1236 if value.is_infinite() && value.is_sign_positive() {
1237 return "+Inf".to_string();
1238 }
1239 if value.fract() == 0.0 {
1240 format!("{value:.0}")
1241 } else {
1242 let rendered = format!("{value:.6}");
1243 rendered
1244 .trim_end_matches('0')
1245 .trim_end_matches('.')
1246 .to_string()
1247 }
1248}
1249
1250#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1252pub struct ProviderPayloadSchema {
1253 pub harn_schema_name: String,
1254 #[serde(default)]
1255 pub json_schema: JsonValue,
1256}
1257
1258impl ProviderPayloadSchema {
1259 pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
1260 Self {
1261 harn_schema_name: harn_schema_name.into(),
1262 json_schema,
1263 }
1264 }
1265
1266 pub fn named(harn_schema_name: impl Into<String>) -> Self {
1267 Self::new(harn_schema_name, JsonValue::Null)
1268 }
1269}
1270
1271impl Default for ProviderPayloadSchema {
1272 fn default() -> Self {
1273 Self::named("raw")
1274 }
1275}
1276
1277#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1279#[serde(transparent)]
1280pub struct TriggerKind(String);
1281
1282impl TriggerKind {
1283 pub fn new(value: impl Into<String>) -> Self {
1284 Self(value.into())
1285 }
1286
1287 pub fn as_str(&self) -> &str {
1288 self.0.as_str()
1289 }
1290}
1291
1292impl From<&str> for TriggerKind {
1293 fn from(value: &str) -> Self {
1294 Self::new(value)
1295 }
1296}
1297
1298impl From<String> for TriggerKind {
1299 fn from(value: String) -> Self {
1300 Self::new(value)
1301 }
1302}
1303
1304#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1306pub struct TriggerBinding {
1307 pub provider: ProviderId,
1308 pub kind: TriggerKind,
1309 pub binding_id: String,
1310 #[serde(default)]
1311 pub dedupe_key: Option<String>,
1312 #[serde(default = "default_dedupe_retention_days")]
1313 pub dedupe_retention_days: u32,
1314 #[serde(default)]
1315 pub config: JsonValue,
1316}
1317
1318impl TriggerBinding {
1319 pub fn new(
1320 provider: ProviderId,
1321 kind: impl Into<TriggerKind>,
1322 binding_id: impl Into<String>,
1323 ) -> Self {
1324 Self {
1325 provider,
1326 kind: kind.into(),
1327 binding_id: binding_id.into(),
1328 dedupe_key: None,
1329 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1330 config: JsonValue::Null,
1331 }
1332 }
1333}
1334
1335fn default_dedupe_retention_days() -> u32 {
1336 crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
1337}
1338
1339#[derive(Clone, Debug, Default)]
1341pub struct TriggerRegistry {
1342 bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
1343}
1344
1345impl TriggerRegistry {
1346 pub fn register(&mut self, binding: TriggerBinding) {
1347 self.bindings
1348 .entry(binding.provider.clone())
1349 .or_default()
1350 .push(binding);
1351 }
1352
1353 pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
1354 &self.bindings
1355 }
1356
1357 pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
1358 self.bindings
1359 .get(provider)
1360 .map(Vec::as_slice)
1361 .unwrap_or(&[])
1362 }
1363}
1364
1365#[derive(Clone, Debug, PartialEq, Eq)]
1367pub struct ActivationHandle {
1368 pub provider: ProviderId,
1369 pub binding_count: usize,
1370}
1371
1372impl ActivationHandle {
1373 pub fn new(provider: ProviderId, binding_count: usize) -> Self {
1374 Self {
1375 provider,
1376 binding_count,
1377 }
1378 }
1379}
1380
1381#[derive(Clone, Debug, PartialEq)]
1383pub struct RawInbound {
1384 pub kind: String,
1385 pub headers: BTreeMap<String, String>,
1386 pub query: BTreeMap<String, String>,
1387 pub body: Vec<u8>,
1388 pub received_at: OffsetDateTime,
1389 pub occurred_at: Option<OffsetDateTime>,
1390 pub tenant_id: Option<TenantId>,
1391 pub metadata: JsonValue,
1392}
1393
1394impl RawInbound {
1395 pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
1396 Self {
1397 kind: kind.into(),
1398 headers,
1399 query: BTreeMap::new(),
1400 body,
1401 received_at: clock::now_utc(),
1402 occurred_at: None,
1403 tenant_id: None,
1404 metadata: JsonValue::Null,
1405 }
1406 }
1407
1408 pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
1409 Ok(serde_json::from_slice(&self.body)?)
1410 }
1411}
1412
1413#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1415pub struct RateLimitConfig {
1416 pub capacity: u32,
1417 pub refill_tokens: u32,
1418 pub refill_interval: StdDuration,
1419}
1420
1421impl Default for RateLimitConfig {
1422 fn default() -> Self {
1423 Self {
1424 capacity: 60,
1425 refill_tokens: 1,
1426 refill_interval: StdDuration::from_secs(1),
1427 }
1428 }
1429}
1430
1431#[derive(Clone, Debug)]
1432struct TokenBucket {
1433 tokens: f64,
1434 last_refill: ClockInstant,
1435}
1436
1437impl TokenBucket {
1438 fn full(config: RateLimitConfig) -> Self {
1439 Self {
1440 tokens: config.capacity as f64,
1441 last_refill: clock::instant_now(),
1442 }
1443 }
1444
1445 fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
1446 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1447 let rate = config.refill_tokens.max(1) as f64 / interval;
1448 let elapsed = now.duration_since(self.last_refill).as_secs_f64();
1449 self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
1450 self.last_refill = now;
1451 }
1452
1453 fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
1454 self.refill(config, now);
1455 if self.tokens >= 1.0 {
1456 self.tokens -= 1.0;
1457 true
1458 } else {
1459 false
1460 }
1461 }
1462
1463 fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
1464 if self.tokens >= 1.0 {
1465 return StdDuration::ZERO;
1466 }
1467 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1468 let rate = config.refill_tokens.max(1) as f64 / interval;
1469 let missing = (1.0 - self.tokens).max(0.0);
1470 StdDuration::from_secs_f64((missing / rate).max(0.001))
1471 }
1472}
1473
1474#[derive(Debug)]
1476pub struct RateLimiterFactory {
1477 config: RateLimitConfig,
1478 buckets: Mutex<HashMap<(String, String), TokenBucket>>,
1479}
1480
1481impl RateLimiterFactory {
1482 pub fn new(config: RateLimitConfig) -> Self {
1483 Self {
1484 config,
1485 buckets: Mutex::new(HashMap::new()),
1486 }
1487 }
1488
1489 pub fn config(&self) -> RateLimitConfig {
1490 self.config
1491 }
1492
1493 pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
1494 ScopedRateLimiter {
1495 factory: self,
1496 provider: provider.clone(),
1497 key: key.into(),
1498 }
1499 }
1500
1501 pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
1502 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1503 let bucket = buckets
1504 .entry((provider.as_str().to_string(), key.to_string()))
1505 .or_insert_with(|| TokenBucket::full(self.config));
1506 bucket.try_acquire(self.config, clock::instant_now())
1507 }
1508
1509 pub async fn acquire(&self, provider: &ProviderId, key: &str) {
1510 loop {
1511 let wait = {
1512 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1513 let bucket = buckets
1514 .entry((provider.as_str().to_string(), key.to_string()))
1515 .or_insert_with(|| TokenBucket::full(self.config));
1516 if bucket.try_acquire(self.config, clock::instant_now()) {
1517 return;
1518 }
1519 bucket.wait_duration(self.config)
1520 };
1521 tokio::time::sleep(wait).await;
1522 }
1523 }
1524}
1525
1526impl Default for RateLimiterFactory {
1527 fn default() -> Self {
1528 Self::new(RateLimitConfig::default())
1529 }
1530}
1531
1532#[derive(Clone, Debug)]
1534pub struct ScopedRateLimiter<'a> {
1535 factory: &'a RateLimiterFactory,
1536 provider: ProviderId,
1537 key: String,
1538}
1539
1540impl<'a> ScopedRateLimiter<'a> {
1541 pub fn try_acquire(&self) -> bool {
1542 self.factory.try_acquire(&self.provider, &self.key)
1543 }
1544
1545 pub async fn acquire(&self) {
1546 self.factory.acquire(&self.provider, &self.key).await;
1547 }
1548}
1549
1550pub struct ConnectorRegistry {
1552 connectors: BTreeMap<ProviderId, ConnectorHandle>,
1553}
1554
1555impl ConnectorRegistry {
1556 pub fn empty() -> Self {
1557 Self {
1558 connectors: BTreeMap::new(),
1559 }
1560 }
1561
1562 pub fn with_defaults() -> Self {
1563 let mut registry = Self::empty();
1564 for provider in registered_provider_metadata() {
1565 registry
1566 .register(default_connector_for_provider(&provider))
1567 .expect("default connector registration should not fail");
1568 }
1569 registry
1570 }
1571
1572 pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
1573 let provider = connector.provider_id().clone();
1574 if self.connectors.contains_key(&provider) {
1575 return Err(ConnectorError::DuplicateProvider(provider.0));
1576 }
1577 self.connectors
1578 .insert(provider, Arc::new(AsyncMutex::new(connector)));
1579 Ok(())
1580 }
1581
1582 pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
1583 self.connectors.get(id).cloned()
1584 }
1585
1586 pub fn remove(&mut self, id: &ProviderId) -> Option<ConnectorHandle> {
1587 self.connectors.remove(id)
1588 }
1589
1590 pub fn list(&self) -> Vec<ProviderId> {
1591 self.connectors.keys().cloned().collect()
1592 }
1593
1594 pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1595 for connector in self.connectors.values() {
1596 connector.lock().await.init(ctx.clone()).await?;
1597 }
1598 Ok(())
1599 }
1600
1601 pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
1602 let mut clients = BTreeMap::new();
1603 for (provider, connector) in &self.connectors {
1604 let client = connector.lock().await.client();
1605 clients.insert(provider.clone(), client);
1606 }
1607 clients
1608 }
1609
1610 pub async fn activate_all(
1611 &self,
1612 registry: &TriggerRegistry,
1613 ) -> Result<Vec<ActivationHandle>, ConnectorError> {
1614 let mut handles = Vec::new();
1615 for (provider, connector) in &self.connectors {
1616 let bindings = registry.bindings_for(provider);
1617 if bindings.is_empty() {
1618 continue;
1619 }
1620 let connector = connector.lock().await;
1621 handles.push(connector.activate(bindings).await?);
1622 }
1623 Ok(handles)
1624 }
1625}
1626
1627impl Default for ConnectorRegistry {
1628 fn default() -> Self {
1629 Self::with_defaults()
1630 }
1631}
1632
1633fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
1634 if provider.provider == "github" {
1644 return Box::new(GitHubConnector::new());
1645 }
1646 if provider.provider == "linear" {
1647 return Box::new(LinearConnector::new());
1648 }
1649 if provider.provider == "slack" {
1650 return Box::new(SlackConnector::new());
1651 }
1652 if provider.provider == "notion" {
1653 return Box::new(NotionConnector::new());
1654 }
1655 if provider.provider == "a2a-push" {
1656 return Box::new(A2aPushConnector::new());
1657 }
1658 match &provider.runtime {
1659 ProviderRuntimeMetadata::Builtin {
1660 connector,
1661 default_signature_variant,
1662 } => match connector.as_str() {
1663 "cron" => Box::new(CronConnector::new()),
1664 "stream" => Box::new(StreamConnector::new(
1665 ProviderId::from(provider.provider.clone()),
1666 provider.schema_name.clone(),
1667 )),
1668 "webhook" => {
1669 let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
1670 .expect("catalog webhook signature variant must be valid");
1671 Box::new(GenericWebhookConnector::with_profile(
1672 WebhookProviderProfile::new(
1673 ProviderId::from(provider.provider.clone()),
1674 provider.schema_name.clone(),
1675 variant,
1676 ),
1677 ))
1678 }
1679 _ => Box::new(PlaceholderConnector::from_metadata(provider)),
1680 },
1681 ProviderRuntimeMetadata::Placeholder => {
1682 Box::new(PlaceholderConnector::from_metadata(provider))
1683 }
1684 }
1685}
1686
1687struct PlaceholderConnector {
1688 provider_id: ProviderId,
1689 kinds: Vec<TriggerKind>,
1690 schema_name: String,
1691}
1692
1693impl PlaceholderConnector {
1694 fn from_metadata(metadata: &ProviderMetadata) -> Self {
1695 Self {
1696 provider_id: ProviderId::from(metadata.provider.clone()),
1697 kinds: metadata
1698 .kinds
1699 .iter()
1700 .cloned()
1701 .map(TriggerKind::from)
1702 .collect(),
1703 schema_name: metadata.schema_name.clone(),
1704 }
1705 }
1706}
1707
1708struct PlaceholderClient;
1709
1710#[async_trait]
1711impl ConnectorClient for PlaceholderClient {
1712 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1713 Err(ClientError::Other(format!(
1714 "connector client method '{method}' is not implemented for this provider"
1715 )))
1716 }
1717}
1718
1719#[async_trait]
1720impl Connector for PlaceholderConnector {
1721 fn provider_id(&self) -> &ProviderId {
1722 &self.provider_id
1723 }
1724
1725 fn kinds(&self) -> &[TriggerKind] {
1726 &self.kinds
1727 }
1728
1729 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1730 Ok(())
1731 }
1732
1733 async fn activate(
1734 &self,
1735 bindings: &[TriggerBinding],
1736 ) -> Result<ActivationHandle, ConnectorError> {
1737 Ok(ActivationHandle::new(
1738 self.provider_id.clone(),
1739 bindings.len(),
1740 ))
1741 }
1742
1743 async fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
1744 Err(ConnectorError::Unsupported(format!(
1745 "provider '{}' is cataloged but does not have a concrete inbound connector yet",
1746 self.provider_id.as_str()
1747 )))
1748 }
1749
1750 fn payload_schema(&self) -> ProviderPayloadSchema {
1751 ProviderPayloadSchema::named(self.schema_name.clone())
1752 }
1753
1754 fn client(&self) -> Arc<dyn ConnectorClient> {
1755 Arc::new(PlaceholderClient)
1756 }
1757}
1758
1759pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
1760 ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
1761 *slot.borrow_mut() = clients
1762 .into_iter()
1763 .map(|(provider, client)| (provider.as_str().to_string(), client))
1764 .collect();
1765 });
1766}
1767
1768pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
1769 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
1770}
1771
1772pub fn clear_active_connector_clients() {
1773 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
1774}
1775
1776#[cfg(test)]
1777mod tests {
1778 use super::*;
1779
1780 use std::sync::atomic::{AtomicUsize, Ordering};
1781
1782 use async_trait::async_trait;
1783 use serde_json::json;
1784
1785 struct NoopClient;
1786
1787 #[async_trait]
1788 impl ConnectorClient for NoopClient {
1789 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1790 Ok(json!({ "method": method }))
1791 }
1792 }
1793
1794 struct FakeConnector {
1795 provider_id: ProviderId,
1796 kinds: Vec<TriggerKind>,
1797 activate_calls: Arc<AtomicUsize>,
1798 }
1799
1800 impl FakeConnector {
1801 fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
1802 Self {
1803 provider_id: ProviderId::from(provider_id),
1804 kinds: vec![TriggerKind::from("webhook")],
1805 activate_calls,
1806 }
1807 }
1808 }
1809
1810 #[async_trait]
1811 impl Connector for FakeConnector {
1812 fn provider_id(&self) -> &ProviderId {
1813 &self.provider_id
1814 }
1815
1816 fn kinds(&self) -> &[TriggerKind] {
1817 &self.kinds
1818 }
1819
1820 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1821 Ok(())
1822 }
1823
1824 async fn activate(
1825 &self,
1826 bindings: &[TriggerBinding],
1827 ) -> Result<ActivationHandle, ConnectorError> {
1828 self.activate_calls.fetch_add(1, Ordering::SeqCst);
1829 Ok(ActivationHandle::new(
1830 self.provider_id.clone(),
1831 bindings.len(),
1832 ))
1833 }
1834
1835 async fn normalize_inbound(
1836 &self,
1837 _raw: RawInbound,
1838 ) -> Result<TriggerEvent, ConnectorError> {
1839 Err(ConnectorError::Unsupported(
1840 "not needed for registry tests".to_string(),
1841 ))
1842 }
1843
1844 fn payload_schema(&self) -> ProviderPayloadSchema {
1845 ProviderPayloadSchema::named("FakePayload")
1846 }
1847
1848 fn client(&self) -> Arc<dyn ConnectorClient> {
1849 Arc::new(NoopClient)
1850 }
1851 }
1852
1853 #[tokio::test]
1854 async fn connector_registry_rejects_duplicate_providers() {
1855 let activate_calls = Arc::new(AtomicUsize::new(0));
1856 let mut registry = ConnectorRegistry::empty();
1857 registry
1858 .register(Box::new(FakeConnector::new(
1859 "github",
1860 activate_calls.clone(),
1861 )))
1862 .unwrap();
1863
1864 let error = registry
1865 .register(Box::new(FakeConnector::new("github", activate_calls)))
1866 .unwrap_err();
1867 assert!(matches!(
1868 error,
1869 ConnectorError::DuplicateProvider(provider) if provider == "github"
1870 ));
1871 }
1872
1873 #[tokio::test]
1874 async fn connector_registry_activates_only_bound_connectors() {
1875 let github_calls = Arc::new(AtomicUsize::new(0));
1876 let slack_calls = Arc::new(AtomicUsize::new(0));
1877 let mut registry = ConnectorRegistry::empty();
1878 registry
1879 .register(Box::new(FakeConnector::new("github", github_calls.clone())))
1880 .unwrap();
1881 registry
1882 .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
1883 .unwrap();
1884
1885 let mut trigger_registry = TriggerRegistry::default();
1886 trigger_registry.register(TriggerBinding::new(
1887 ProviderId::from("github"),
1888 "webhook",
1889 "github.push",
1890 ));
1891 trigger_registry.register(TriggerBinding::new(
1892 ProviderId::from("github"),
1893 "webhook",
1894 "github.installation",
1895 ));
1896
1897 let handles = registry.activate_all(&trigger_registry).await.unwrap();
1898 assert_eq!(handles.len(), 1);
1899 assert_eq!(handles[0].provider.as_str(), "github");
1900 assert_eq!(handles[0].binding_count, 2);
1901 assert_eq!(github_calls.load(Ordering::SeqCst), 1);
1902 assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
1903 }
1904
1905 #[test]
1906 fn rate_limiter_scopes_tokens_by_provider_and_key() {
1907 let factory = RateLimiterFactory::new(RateLimitConfig {
1908 capacity: 1,
1909 refill_tokens: 1,
1910 refill_interval: StdDuration::from_secs(60),
1911 });
1912
1913 assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
1914 assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
1915 assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
1916 assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
1917 }
1918
1919 #[test]
1920 fn raw_inbound_json_body_preserves_raw_bytes() {
1921 let raw = RawInbound::new(
1922 "push",
1923 BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
1924 br#"{"ok":true}"#.to_vec(),
1925 );
1926
1927 assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
1928 }
1929
1930 #[test]
1931 fn connector_registry_lists_catalog_providers() {
1932 let registry = ConnectorRegistry::default();
1933 let providers = registry.list();
1934 assert!(providers.contains(&ProviderId::from("cron")));
1935 assert!(providers.contains(&ProviderId::from("github")));
1936 assert!(providers.contains(&ProviderId::from("webhook")));
1937 }
1938
1939 #[test]
1940 fn metrics_registry_exports_orchestrator_metric_families() {
1941 let metrics = MetricsRegistry::default();
1942 metrics.record_http_request(
1943 "/triggers/github",
1944 "POST",
1945 200,
1946 StdDuration::from_millis(25),
1947 512,
1948 );
1949 metrics.record_trigger_received("github-new-issue", "github");
1950 metrics.record_trigger_deduped("github-new-issue", "inbox_duplicate");
1951 metrics.record_trigger_predicate_evaluation("github-new-issue", true, 0.002);
1952 metrics.record_trigger_dispatched("github-new-issue", "local", "succeeded");
1953 metrics.record_trigger_retry("github-new-issue", 2);
1954 metrics.record_trigger_dlq("github-new-issue", "retry_exhausted");
1955 metrics.set_trigger_inflight("github-new-issue", 0);
1956 metrics.record_event_log_append(
1957 "orchestrator.triggers.pending",
1958 StdDuration::from_millis(1),
1959 2048,
1960 );
1961 metrics.set_event_log_consumer_lag("orchestrator.triggers.pending", "orchestrator-pump", 0);
1962 metrics.set_trigger_budget_cost_today("github-new-issue", 0.002);
1963 metrics.record_trigger_budget_exhausted("github-new-issue", "daily_budget_exceeded");
1964 metrics.record_a2a_hop("agent.example", "succeeded", StdDuration::from_millis(10));
1965 metrics.set_worker_queue_depth("triage", 1);
1966 metrics.record_worker_queue_claim_age("triage", 3.0);
1967 metrics.set_orchestrator_pump_backlog("trigger.inbox.envelopes", 2);
1968 metrics.set_orchestrator_pump_outstanding("trigger.inbox.envelopes", 1);
1969 metrics.record_orchestrator_pump_admission_delay(
1970 "trigger.inbox.envelopes",
1971 StdDuration::from_millis(50),
1972 );
1973 metrics.record_trigger_accepted_to_normalized(
1974 "github-new-issue",
1975 "github-new-issue@v7",
1976 "github",
1977 Some("tenant-a"),
1978 "normalized",
1979 StdDuration::from_millis(25),
1980 );
1981 metrics.record_trigger_accepted_to_queue_append(
1982 "github-new-issue",
1983 "github-new-issue@v7",
1984 "github",
1985 Some("tenant-a"),
1986 "queued",
1987 StdDuration::from_millis(40),
1988 );
1989 metrics.record_trigger_queue_age_at_dispatch_admission(
1990 "github-new-issue",
1991 "github-new-issue@v7",
1992 "github",
1993 Some("tenant-a"),
1994 "admitted",
1995 StdDuration::from_millis(75),
1996 );
1997 metrics.record_trigger_queue_age_at_dispatch_start(
1998 "github-new-issue",
1999 "github-new-issue@v7",
2000 "github",
2001 Some("tenant-a"),
2002 "started",
2003 StdDuration::from_millis(125),
2004 );
2005 metrics.record_trigger_dispatch_runtime(
2006 "github-new-issue",
2007 "github-new-issue@v7",
2008 "github",
2009 Some("tenant-a"),
2010 "succeeded",
2011 StdDuration::from_millis(250),
2012 );
2013 metrics.record_trigger_retry_delay(
2014 "github-new-issue",
2015 "github-new-issue@v7",
2016 "github",
2017 Some("tenant-a"),
2018 "scheduled",
2019 StdDuration::from_secs(2),
2020 );
2021 metrics.record_trigger_accepted_to_dlq(
2022 "github-new-issue",
2023 "github-new-issue@v7",
2024 "github",
2025 Some("tenant-a"),
2026 "retry_exhausted",
2027 StdDuration::from_secs(45),
2028 );
2029 metrics.record_backpressure_event("ingest", "reject");
2030 metrics.note_trigger_pending_event(
2031 "evt-1",
2032 "github-new-issue",
2033 "github-new-issue@v7",
2034 "github",
2035 Some("tenant-a"),
2036 1_000,
2037 4_000,
2038 );
2039 metrics.record_llm_call("mock", "mock", "succeeded", 0.01);
2040 metrics.record_llm_cache_hit("mock");
2041
2042 let rendered = metrics.render_prometheus();
2043 for needle in [
2044 "harn_http_requests_total{endpoint=\"/triggers/github\",method=\"POST\",status=\"200\"} 1",
2045 "harn_http_request_duration_seconds_bucket{endpoint=\"/triggers/github\",le=\"0.05\"} 1",
2046 "harn_http_body_size_bytes_bucket{endpoint=\"/triggers/github\",le=\"512\"} 1",
2047 "harn_trigger_received_total{provider=\"github\",trigger_id=\"github-new-issue\"} 1",
2048 "harn_trigger_deduped_total{reason=\"inbox_duplicate\",trigger_id=\"github-new-issue\"} 1",
2049 "harn_trigger_predicate_evaluations_total{result=\"true\",trigger_id=\"github-new-issue\"} 1",
2050 "harn_trigger_predicate_cost_usd_bucket{le=\"0.01\",trigger_id=\"github-new-issue\"} 1",
2051 "harn_trigger_dispatched_total{handler_kind=\"local\",outcome=\"succeeded\",trigger_id=\"github-new-issue\"} 1",
2052 "harn_trigger_retries_total{attempt=\"2\",trigger_id=\"github-new-issue\"} 1",
2053 "harn_trigger_dlq_total{reason=\"retry_exhausted\",trigger_id=\"github-new-issue\"} 1",
2054 "harn_trigger_inflight{trigger_id=\"github-new-issue\"} 0",
2055 "harn_event_log_append_duration_seconds_bucket{le=\"0.005\",topic=\"orchestrator.triggers.pending\"} 1",
2056 "harn_event_log_topic_size_bytes{topic=\"orchestrator.triggers.pending\"} 2048",
2057 "harn_event_log_consumer_lag{consumer=\"orchestrator-pump\",topic=\"orchestrator.triggers.pending\"} 0",
2058 "harn_trigger_budget_cost_today_usd{trigger_id=\"github-new-issue\"} 0.002",
2059 "harn_trigger_budget_exhausted_total{strategy=\"daily_budget_exceeded\",trigger_id=\"github-new-issue\"} 1",
2060 "harn_backpressure_events_total{action=\"reject\",dimension=\"ingest\"} 1",
2061 "harn_a2a_hops_total{outcome=\"succeeded\",target=\"agent.example\"} 1",
2062 "harn_a2a_hop_duration_seconds_bucket{le=\"0.01\",target=\"agent.example\"} 1",
2063 "harn_worker_queue_depth{queue=\"triage\"} 1",
2064 "harn_worker_queue_claim_age_seconds_bucket{le=\"5\",queue=\"triage\"} 1",
2065 "harn_orchestrator_pump_backlog{topic=\"trigger.inbox.envelopes\"} 2",
2066 "harn_orchestrator_pump_outstanding{topic=\"trigger.inbox.envelopes\"} 1",
2067 "harn_orchestrator_pump_admission_delay_seconds_bucket{le=\"0.05\",topic=\"trigger.inbox.envelopes\"} 1",
2068 "harn_trigger_webhook_accepted_to_normalized_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.025\",provider=\"github\",status=\"normalized\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2069 "harn_trigger_webhook_accepted_to_queue_append_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.05\",provider=\"github\",status=\"queued\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2070 "harn_trigger_queue_age_at_dispatch_admission_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.1\",provider=\"github\",status=\"admitted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2071 "harn_trigger_queue_age_at_dispatch_start_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"started\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2072 "harn_trigger_dispatch_runtime_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"succeeded\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2073 "harn_trigger_retry_delay_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"2.5\",provider=\"github\",status=\"scheduled\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2074 "harn_trigger_accepted_to_dlq_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"60\",provider=\"github\",status=\"retry_exhausted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2075 "harn_trigger_oldest_pending_age_seconds{binding_key=\"github-new-issue@v7\",provider=\"github\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 3",
2076 "harn_llm_calls_total{model=\"mock\",outcome=\"succeeded\",provider=\"mock\"} 1",
2077 "harn_llm_cost_usd_total{model=\"mock\",provider=\"mock\"} 0.01",
2078 "harn_llm_cache_hits_total{provider=\"mock\"} 1",
2079 ] {
2080 assert!(rendered.contains(needle), "missing {needle}\n{rendered}");
2081 }
2082 }
2083}