Skip to main content

harn_vm/connectors/
mod.rs

1//! Connector traits and shared helpers for inbound event-source providers.
2//!
3//! This lands in `harn-vm` for now because the current dependency surface
4//! (`EventLog`, `SecretProvider`, `TriggerEvent`) already lives here. If the
5//! connector ecosystem grows enough to justify extraction, the module can be
6//! split into a dedicated crate later without changing the high-level contract.
7
8use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex, OnceLock};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25    registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26    ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod a2a_push;
30pub mod cron;
31pub mod effect_policy;
32pub mod github;
33pub mod harn_module;
34pub mod hmac;
35pub mod linear;
36pub mod notion;
37pub mod slack;
38pub mod stream;
39#[cfg(test)]
40pub(crate) mod test_util;
41pub mod testkit;
42pub mod webhook;
43
44pub use a2a_push::A2aPushConnector;
45pub use cron::{CatchupMode, CronConnector};
46pub use effect_policy::{
47    connector_export_denied_builtin_reason, connector_export_effect_class,
48    default_connector_export_policy, ConnectorExportEffectClass, HarnConnectorEffectPolicies,
49};
50pub use github::GitHubConnector;
51pub use harn_module::{
52    load_contract as load_harn_connector_contract, HarnConnector, HarnConnectorContract,
53};
54pub use hmac::{
55    verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
56    DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
57    DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
58    DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
59    DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
60    DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
61    SIGNATURE_VERIFY_AUDIT_TOPIC,
62};
63pub use linear::LinearConnector;
64pub use notion::{
65    load_pending_webhook_handshakes, NotionConnector, PersistedNotionWebhookHandshake,
66};
67pub use slack::SlackConnector;
68pub use stream::StreamConnector;
69use webhook::WebhookProviderProfile;
70pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
71
72const OUTBOUND_CONNECTOR_HTTP_TIMEOUT: StdDuration = StdDuration::from_secs(30);
73
74pub(crate) fn outbound_http_client(user_agent: &'static str) -> reqwest::Client {
75    reqwest::Client::builder()
76        .user_agent(user_agent)
77        .timeout(OUTBOUND_CONNECTOR_HTTP_TIMEOUT)
78        .build()
79        .expect("connector HTTP client configuration should be valid")
80}
81
82/// Shared owned handle to a connector instance registered with the runtime.
83pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
84
85thread_local! {
86    static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
87        RefCell::new(BTreeMap::new());
88}
89
90/// Provider implementation contract for inbound connectors.
91#[async_trait]
92pub trait Connector: Send + Sync {
93    /// Stable provider id such as `github`, `slack`, or `webhook`.
94    fn provider_id(&self) -> &ProviderId;
95
96    /// Trigger kinds this connector supports (`webhook`, `poll`, `stream`, ...).
97    fn kinds(&self) -> &[TriggerKind];
98
99    /// Called once per connector instance at orchestrator startup.
100    async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
101
102    /// Activate the bindings relevant to this connector instance.
103    async fn activate(
104        &self,
105        bindings: &[TriggerBinding],
106    ) -> Result<ActivationHandle, ConnectorError>;
107
108    /// Stop connector-owned background work and flush any connector-local state.
109    async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
110        Ok(())
111    }
112
113    /// Verify + normalize a provider-native inbound request into `TriggerEvent`.
114    async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
115
116    /// Verify + normalize a provider-native inbound request into the richer
117    /// connector result contract used by ack-first webhook adapters.
118    async fn normalize_inbound_result(
119        &self,
120        raw: RawInbound,
121    ) -> Result<ConnectorNormalizeResult, ConnectorError> {
122        self.normalize_inbound(raw)
123            .await
124            .map(ConnectorNormalizeResult::event)
125    }
126
127    /// Payload schema surfaced to future trigger-type narrowing.
128    fn payload_schema(&self) -> ProviderPayloadSchema;
129
130    /// Outbound API wrapper exposed to handlers.
131    fn client(&self) -> Arc<dyn ConnectorClient>;
132}
133
134/// Provider-supplied HTTP response returned before or instead of trigger dispatch.
135#[derive(Clone, Debug, PartialEq)]
136pub struct ConnectorHttpResponse {
137    pub status: u16,
138    pub headers: BTreeMap<String, String>,
139    pub body: JsonValue,
140}
141
142impl ConnectorHttpResponse {
143    pub fn new(status: u16, headers: BTreeMap<String, String>, body: JsonValue) -> Self {
144        Self {
145            status,
146            headers,
147            body,
148        }
149    }
150}
151
152/// Normalized inbound result accepted by the runtime connector adapter.
153#[derive(Clone, Debug, PartialEq)]
154pub enum ConnectorNormalizeResult {
155    Event(Box<TriggerEvent>),
156    Batch(Vec<TriggerEvent>),
157    ImmediateResponse {
158        response: ConnectorHttpResponse,
159        events: Vec<TriggerEvent>,
160    },
161    Reject(ConnectorHttpResponse),
162}
163
164impl ConnectorNormalizeResult {
165    pub fn event(event: TriggerEvent) -> Self {
166        Self::Event(Box::new(event))
167    }
168
169    pub fn into_events(self) -> Vec<TriggerEvent> {
170        match self {
171            Self::Event(event) => vec![*event],
172            Self::Batch(events) | Self::ImmediateResponse { events, .. } => events,
173            Self::Reject(_) => Vec::new(),
174        }
175    }
176}
177
178#[derive(Clone, Debug, PartialEq)]
179pub enum PostNormalizeOutcome {
180    Ready(Box<TriggerEvent>),
181    DuplicateDropped,
182}
183
184pub async fn postprocess_normalized_event(
185    inbox: &InboxIndex,
186    binding_id: &str,
187    dedupe_enabled: bool,
188    dedupe_ttl: StdDuration,
189    mut event: TriggerEvent,
190) -> Result<PostNormalizeOutcome, ConnectorError> {
191    if dedupe_enabled && !event.dedupe_claimed() {
192        if !inbox
193            .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
194            .await?
195        {
196            return Ok(PostNormalizeOutcome::DuplicateDropped);
197        }
198        event.mark_dedupe_claimed();
199    }
200
201    Ok(PostNormalizeOutcome::Ready(Box::new(event)))
202}
203
204/// Outbound provider client interface used by connector-backed stdlib modules.
205#[async_trait]
206pub trait ConnectorClient: Send + Sync {
207    async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
208}
209
210/// Minimal outbound client errors shared by connector implementations.
211#[derive(Clone, Debug, PartialEq, Eq)]
212pub enum ClientError {
213    MethodNotFound(String),
214    InvalidArgs(String),
215    RateLimited(String),
216    Transport(String),
217    Other(String),
218}
219
220impl fmt::Display for ClientError {
221    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
222        match self {
223            Self::MethodNotFound(message)
224            | Self::InvalidArgs(message)
225            | Self::RateLimited(message)
226            | Self::Transport(message)
227            | Self::Other(message) => message.fmt(f),
228        }
229    }
230}
231
232impl std::error::Error for ClientError {}
233
234/// Shared connector-layer errors.
235#[derive(Debug)]
236pub enum ConnectorError {
237    DuplicateProvider(String),
238    DuplicateDelivery(String),
239    UnknownProvider(String),
240    MissingHeader(String),
241    InvalidHeader {
242        name: String,
243        detail: String,
244    },
245    InvalidSignature(String),
246    TimestampOutOfWindow {
247        timestamp: OffsetDateTime,
248        now: OffsetDateTime,
249        window: time::Duration,
250    },
251    Json(String),
252    Secret(String),
253    EventLog(String),
254    HarnRuntime(String),
255    Client(ClientError),
256    Unsupported(String),
257    Activation(String),
258}
259
260impl ConnectorError {
261    pub fn invalid_signature(message: impl Into<String>) -> Self {
262        Self::InvalidSignature(message.into())
263    }
264}
265
266impl fmt::Display for ConnectorError {
267    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
268        match self {
269            Self::DuplicateProvider(provider) => {
270                write!(f, "connector provider `{provider}` is already registered")
271            }
272            Self::DuplicateDelivery(message) => message.fmt(f),
273            Self::UnknownProvider(provider) => {
274                write!(f, "connector provider `{provider}` is not registered")
275            }
276            Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
277            Self::InvalidHeader { name, detail } => {
278                write!(f, "invalid header `{name}`: {detail}")
279            }
280            Self::InvalidSignature(message)
281            | Self::Json(message)
282            | Self::Secret(message)
283            | Self::EventLog(message)
284            | Self::HarnRuntime(message)
285            | Self::Unsupported(message)
286            | Self::Activation(message) => message.fmt(f),
287            Self::TimestampOutOfWindow {
288                timestamp,
289                now,
290                window,
291            } => write!(
292                f,
293                "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
294            ),
295            Self::Client(error) => error.fmt(f),
296        }
297    }
298}
299
300impl std::error::Error for ConnectorError {}
301
302impl From<crate::event_log::LogError> for ConnectorError {
303    fn from(value: crate::event_log::LogError) -> Self {
304        Self::EventLog(value.to_string())
305    }
306}
307
308impl From<crate::secrets::SecretError> for ConnectorError {
309    fn from(value: crate::secrets::SecretError) -> Self {
310        Self::Secret(value.to_string())
311    }
312}
313
314impl From<serde_json::Error> for ConnectorError {
315    fn from(value: serde_json::Error) -> Self {
316        Self::Json(value.to_string())
317    }
318}
319
320impl From<ClientError> for ConnectorError {
321    fn from(value: ClientError) -> Self {
322        Self::Client(value)
323    }
324}
325
326/// Startup context shared with connector instances.
327#[derive(Clone)]
328pub struct ConnectorCtx {
329    pub event_log: Arc<AnyEventLog>,
330    pub secrets: Arc<dyn SecretProvider>,
331    pub inbox: Arc<InboxIndex>,
332    pub metrics: Arc<MetricsRegistry>,
333    pub rate_limiter: Arc<RateLimiterFactory>,
334}
335
336/// Snapshot of connector-local metrics surfaced for tests and diagnostics.
337#[derive(Clone, Debug, Default, PartialEq, Eq)]
338pub struct ConnectorMetricsSnapshot {
339    pub inbox_claims_written: u64,
340    pub inbox_duplicates_rejected: u64,
341    pub inbox_fast_path_hits: u64,
342    pub inbox_durable_hits: u64,
343    pub inbox_expired_entries: u64,
344    pub inbox_active_entries: u64,
345    pub linear_timestamp_rejections_total: u64,
346    pub dispatch_succeeded_total: u64,
347    pub dispatch_failed_total: u64,
348    pub retry_scheduled_total: u64,
349    pub slack_delivery_success_total: u64,
350    pub slack_delivery_failure_total: u64,
351}
352
353type MetricLabels = BTreeMap<String, String>;
354
355#[derive(Clone, Debug, Default, PartialEq)]
356struct HistogramMetric {
357    buckets: BTreeMap<String, u64>,
358    count: u64,
359    sum: f64,
360}
361
362static ACTIVE_METRICS_REGISTRY: OnceLock<Mutex<Option<Arc<MetricsRegistry>>>> = OnceLock::new();
363
364pub fn install_active_metrics_registry(metrics: Arc<MetricsRegistry>) {
365    let slot = ACTIVE_METRICS_REGISTRY.get_or_init(|| Mutex::new(None));
366    *slot.lock().expect("active metrics registry poisoned") = Some(metrics);
367}
368
369pub fn clear_active_metrics_registry() {
370    if let Some(slot) = ACTIVE_METRICS_REGISTRY.get() {
371        *slot.lock().expect("active metrics registry poisoned") = None;
372    }
373}
374
375pub fn active_metrics_registry() -> Option<Arc<MetricsRegistry>> {
376    ACTIVE_METRICS_REGISTRY.get().and_then(|slot| {
377        slot.lock()
378            .expect("active metrics registry poisoned")
379            .clone()
380    })
381}
382
383/// Shared metrics surface for connector-local counters and timings.
384#[derive(Debug, Default)]
385pub struct MetricsRegistry {
386    inbox_claims_written: AtomicU64,
387    inbox_duplicates_rejected: AtomicU64,
388    inbox_fast_path_hits: AtomicU64,
389    inbox_durable_hits: AtomicU64,
390    inbox_expired_entries: AtomicU64,
391    inbox_active_entries: AtomicU64,
392    linear_timestamp_rejections_total: AtomicU64,
393    dispatch_succeeded_total: AtomicU64,
394    dispatch_failed_total: AtomicU64,
395    retry_scheduled_total: AtomicU64,
396    slack_delivery_success_total: AtomicU64,
397    slack_delivery_failure_total: AtomicU64,
398    custom_counters: Mutex<BTreeMap<String, u64>>,
399    counters: Mutex<BTreeMap<(String, MetricLabels), f64>>,
400    gauges: Mutex<BTreeMap<(String, MetricLabels), f64>>,
401    histograms: Mutex<BTreeMap<(String, MetricLabels), HistogramMetric>>,
402    pending_trigger_events: Mutex<BTreeMap<MetricLabels, BTreeMap<String, i64>>>,
403}
404
405impl MetricsRegistry {
406    const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0];
407    const TRIGGER_LATENCY_BUCKETS: [f64; 15] = [
408        0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0,
409    ];
410    const SIZE_BUCKETS: [f64; 9] = [
411        128.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0, 10485760.0,
412    ];
413
414    pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
415        ConnectorMetricsSnapshot {
416            inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
417            inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
418            inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
419            inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
420            inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
421            inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
422            linear_timestamp_rejections_total: self
423                .linear_timestamp_rejections_total
424                .load(Ordering::Relaxed),
425            dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
426            dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
427            retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
428            slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
429            slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
430        }
431    }
432
433    pub(crate) fn record_inbox_claim(&self) {
434        self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
435    }
436
437    pub(crate) fn record_inbox_duplicate_fast_path(&self) {
438        self.inbox_duplicates_rejected
439            .fetch_add(1, Ordering::Relaxed);
440        self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
441    }
442
443    pub(crate) fn record_inbox_duplicate_durable(&self) {
444        self.inbox_duplicates_rejected
445            .fetch_add(1, Ordering::Relaxed);
446        self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
447    }
448
449    pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
450        if count > 0 {
451            self.inbox_expired_entries
452                .fetch_add(count, Ordering::Relaxed);
453        }
454    }
455
456    pub(crate) fn set_inbox_active_entries(&self, count: usize) {
457        self.inbox_active_entries
458            .store(count as u64, Ordering::Relaxed);
459    }
460
461    pub fn record_linear_timestamp_rejection(&self) {
462        self.linear_timestamp_rejections_total
463            .fetch_add(1, Ordering::Relaxed);
464    }
465
466    pub fn record_dispatch_succeeded(&self) {
467        self.dispatch_succeeded_total
468            .fetch_add(1, Ordering::Relaxed);
469    }
470
471    pub fn record_dispatch_failed(&self) {
472        self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
473    }
474
475    pub fn record_retry_scheduled(&self) {
476        self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
477    }
478
479    pub fn record_slack_delivery_success(&self) {
480        self.slack_delivery_success_total
481            .fetch_add(1, Ordering::Relaxed);
482    }
483
484    pub fn record_slack_delivery_failure(&self) {
485        self.slack_delivery_failure_total
486            .fetch_add(1, Ordering::Relaxed);
487    }
488
489    pub fn record_custom_counter(&self, name: &str, amount: u64) {
490        if amount == 0 {
491            return;
492        }
493        let mut counters = self
494            .custom_counters
495            .lock()
496            .expect("custom counters poisoned");
497        *counters.entry(name.to_string()).or_default() += amount;
498    }
499
500    pub fn record_http_request(
501        &self,
502        endpoint: &str,
503        method: &str,
504        status: u16,
505        duration: StdDuration,
506        body_size_bytes: usize,
507    ) {
508        self.increment_counter(
509            "harn_http_requests_total",
510            labels([
511                ("endpoint", endpoint),
512                ("method", method),
513                ("status", &status.to_string()),
514            ]),
515            1,
516        );
517        self.observe_histogram(
518            "harn_http_request_duration_seconds",
519            labels([("endpoint", endpoint)]),
520            duration.as_secs_f64(),
521            &Self::DURATION_BUCKETS,
522        );
523        self.observe_histogram(
524            "harn_http_body_size_bytes",
525            labels([("endpoint", endpoint)]),
526            body_size_bytes as f64,
527            &Self::SIZE_BUCKETS,
528        );
529    }
530
531    pub fn record_trigger_received(&self, trigger_id: &str, provider: &str) {
532        self.increment_counter(
533            "harn_trigger_received_total",
534            labels([("trigger_id", trigger_id), ("provider", provider)]),
535            1,
536        );
537    }
538
539    pub fn record_trigger_deduped(&self, trigger_id: &str, reason: &str) {
540        self.increment_counter(
541            "harn_trigger_deduped_total",
542            labels([("trigger_id", trigger_id), ("reason", reason)]),
543            1,
544        );
545    }
546
547    pub fn record_trigger_predicate_evaluation(
548        &self,
549        trigger_id: &str,
550        result: bool,
551        cost_usd: f64,
552    ) {
553        self.increment_counter(
554            "harn_trigger_predicate_evaluations_total",
555            labels([
556                ("trigger_id", trigger_id),
557                ("result", if result { "true" } else { "false" }),
558            ]),
559            1,
560        );
561        self.observe_histogram(
562            "harn_trigger_predicate_cost_usd",
563            labels([("trigger_id", trigger_id)]),
564            cost_usd.max(0.0),
565            &[0.0, 0.001, 0.01, 0.05, 0.1, 1.0],
566        );
567    }
568
569    pub fn record_trigger_dispatched(&self, trigger_id: &str, handler_kind: &str, outcome: &str) {
570        self.increment_counter(
571            "harn_trigger_dispatched_total",
572            labels([
573                ("trigger_id", trigger_id),
574                ("handler_kind", handler_kind),
575                ("outcome", outcome),
576            ]),
577            1,
578        );
579    }
580
581    pub fn record_trigger_retry(&self, trigger_id: &str, attempt: u32) {
582        self.increment_counter(
583            "harn_trigger_retries_total",
584            labels([
585                ("trigger_id", trigger_id),
586                ("attempt", &attempt.to_string()),
587            ]),
588            1,
589        );
590    }
591
592    pub fn record_trigger_dlq(&self, trigger_id: &str, reason: &str) {
593        self.increment_counter(
594            "harn_trigger_dlq_total",
595            labels([("trigger_id", trigger_id), ("reason", reason)]),
596            1,
597        );
598    }
599
600    pub fn record_trigger_accepted_to_normalized(
601        &self,
602        trigger_id: &str,
603        binding_key: &str,
604        provider: &str,
605        tenant_id: Option<&str>,
606        status: &str,
607        duration: StdDuration,
608    ) {
609        self.observe_histogram(
610            "harn_trigger_webhook_accepted_to_normalized_seconds",
611            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
612            duration.as_secs_f64(),
613            &Self::TRIGGER_LATENCY_BUCKETS,
614        );
615    }
616
617    pub fn record_trigger_accepted_to_queue_append(
618        &self,
619        trigger_id: &str,
620        binding_key: &str,
621        provider: &str,
622        tenant_id: Option<&str>,
623        status: &str,
624        duration: StdDuration,
625    ) {
626        self.observe_histogram(
627            "harn_trigger_webhook_accepted_to_queue_append_seconds",
628            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
629            duration.as_secs_f64(),
630            &Self::TRIGGER_LATENCY_BUCKETS,
631        );
632    }
633
634    pub fn record_trigger_queue_age_at_dispatch_admission(
635        &self,
636        trigger_id: &str,
637        binding_key: &str,
638        provider: &str,
639        tenant_id: Option<&str>,
640        status: &str,
641        age: StdDuration,
642    ) {
643        self.observe_histogram(
644            "harn_trigger_queue_age_at_dispatch_admission_seconds",
645            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
646            age.as_secs_f64(),
647            &Self::TRIGGER_LATENCY_BUCKETS,
648        );
649    }
650
651    pub fn record_trigger_queue_age_at_dispatch_start(
652        &self,
653        trigger_id: &str,
654        binding_key: &str,
655        provider: &str,
656        tenant_id: Option<&str>,
657        status: &str,
658        age: StdDuration,
659    ) {
660        self.observe_histogram(
661            "harn_trigger_queue_age_at_dispatch_start_seconds",
662            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
663            age.as_secs_f64(),
664            &Self::TRIGGER_LATENCY_BUCKETS,
665        );
666    }
667
668    pub fn record_trigger_dispatch_runtime(
669        &self,
670        trigger_id: &str,
671        binding_key: &str,
672        provider: &str,
673        tenant_id: Option<&str>,
674        status: &str,
675        duration: StdDuration,
676    ) {
677        self.observe_histogram(
678            "harn_trigger_dispatch_runtime_seconds",
679            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
680            duration.as_secs_f64(),
681            &Self::TRIGGER_LATENCY_BUCKETS,
682        );
683    }
684
685    pub fn record_trigger_retry_delay(
686        &self,
687        trigger_id: &str,
688        binding_key: &str,
689        provider: &str,
690        tenant_id: Option<&str>,
691        status: &str,
692        duration: StdDuration,
693    ) {
694        self.observe_histogram(
695            "harn_trigger_retry_delay_seconds",
696            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
697            duration.as_secs_f64(),
698            &Self::TRIGGER_LATENCY_BUCKETS,
699        );
700    }
701
702    pub fn record_trigger_accepted_to_dlq(
703        &self,
704        trigger_id: &str,
705        binding_key: &str,
706        provider: &str,
707        tenant_id: Option<&str>,
708        status: &str,
709        duration: StdDuration,
710    ) {
711        self.observe_histogram(
712            "harn_trigger_accepted_to_dlq_seconds",
713            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
714            duration.as_secs_f64(),
715            &Self::TRIGGER_LATENCY_BUCKETS,
716        );
717    }
718
719    pub fn note_trigger_pending_event(
720        &self,
721        event_id: &str,
722        trigger_id: &str,
723        binding_key: &str,
724        provider: &str,
725        tenant_id: Option<&str>,
726        accepted_at_ms: i64,
727        now_ms: i64,
728    ) {
729        let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
730        {
731            let mut pending = self
732                .pending_trigger_events
733                .lock()
734                .expect("pending trigger events poisoned");
735            pending
736                .entry(labels.clone())
737                .or_default()
738                .insert(event_id.to_string(), accepted_at_ms);
739        }
740        self.refresh_oldest_pending_gauge(labels, now_ms);
741    }
742
743    pub fn clear_trigger_pending_event(
744        &self,
745        event_id: &str,
746        trigger_id: &str,
747        binding_key: &str,
748        provider: &str,
749        tenant_id: Option<&str>,
750        now_ms: i64,
751    ) {
752        let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
753        {
754            let mut pending = self
755                .pending_trigger_events
756                .lock()
757                .expect("pending trigger events poisoned");
758            if let Some(events) = pending.get_mut(&labels) {
759                events.remove(event_id);
760                if events.is_empty() {
761                    pending.remove(&labels);
762                }
763            }
764        }
765        self.refresh_oldest_pending_gauge(labels, now_ms);
766    }
767
768    pub fn set_trigger_inflight(&self, trigger_id: &str, count: u64) {
769        self.set_gauge(
770            "harn_trigger_inflight",
771            labels([("trigger_id", trigger_id)]),
772            count as f64,
773        );
774    }
775
776    pub fn set_trigger_budget_cost_today(&self, trigger_id: &str, cost_usd: f64) {
777        self.set_gauge(
778            "harn_trigger_budget_cost_today_usd",
779            labels([("trigger_id", trigger_id)]),
780            cost_usd.max(0.0),
781        );
782    }
783
784    pub fn record_trigger_budget_exhausted(&self, trigger_id: &str, strategy: &str) {
785        self.increment_counter(
786            "harn_trigger_budget_exhausted_total",
787            labels([("trigger_id", trigger_id), ("strategy", strategy)]),
788            1,
789        );
790    }
791
792    pub fn record_backpressure_event(&self, dimension: &str, action: &str) {
793        self.increment_counter(
794            "harn_backpressure_events_total",
795            labels([("dimension", dimension), ("action", action)]),
796            1,
797        );
798    }
799
800    pub fn record_event_log_append(
801        &self,
802        topic: &str,
803        duration: StdDuration,
804        payload_bytes: usize,
805    ) {
806        self.observe_histogram(
807            "harn_event_log_append_duration_seconds",
808            labels([("topic", topic)]),
809            duration.as_secs_f64(),
810            &Self::DURATION_BUCKETS,
811        );
812        self.set_gauge(
813            "harn_event_log_topic_size_bytes",
814            labels([("topic", topic)]),
815            payload_bytes as f64,
816        );
817    }
818
819    pub fn set_event_log_consumer_lag(&self, topic: &str, consumer: &str, lag: u64) {
820        self.set_gauge(
821            "harn_event_log_consumer_lag",
822            labels([("topic", topic), ("consumer", consumer)]),
823            lag as f64,
824        );
825    }
826
827    pub fn record_a2a_hop(&self, target: &str, outcome: &str, duration: StdDuration) {
828        self.increment_counter(
829            "harn_a2a_hops_total",
830            labels([("target", target), ("outcome", outcome)]),
831            1,
832        );
833        self.observe_histogram(
834            "harn_a2a_hop_duration_seconds",
835            labels([("target", target)]),
836            duration.as_secs_f64(),
837            &Self::DURATION_BUCKETS,
838        );
839    }
840
841    pub fn set_worker_queue_depth(&self, queue: &str, depth: u64) {
842        self.set_gauge(
843            "harn_worker_queue_depth",
844            labels([("queue", queue)]),
845            depth as f64,
846        );
847    }
848
849    pub fn record_worker_queue_claim_age(&self, queue: &str, age_seconds: f64) {
850        self.observe_histogram(
851            "harn_worker_queue_claim_age_seconds",
852            labels([("queue", queue)]),
853            age_seconds.max(0.0),
854            &Self::DURATION_BUCKETS,
855        );
856    }
857
858    /// Increment the scheduler-selection counter for a particular fairness key.
859    pub fn record_scheduler_selection(
860        &self,
861        queue: &str,
862        fairness_dimension: &str,
863        fairness_key: &str,
864    ) {
865        self.increment_counter(
866            "harn_scheduler_selections_total",
867            labels([
868                ("queue", queue),
869                ("fairness_dimension", fairness_dimension),
870                ("fairness_key", fairness_key),
871            ]),
872            1,
873        );
874    }
875
876    /// Increment the scheduler-deferred counter (queue had work but couldn't
877    /// be selected because the key was at its concurrency cap).
878    pub fn record_scheduler_deferral(
879        &self,
880        queue: &str,
881        fairness_dimension: &str,
882        fairness_key: &str,
883    ) {
884        self.increment_counter(
885            "harn_scheduler_deferrals_total",
886            labels([
887                ("queue", queue),
888                ("fairness_dimension", fairness_dimension),
889                ("fairness_key", fairness_key),
890            ]),
891            1,
892        );
893    }
894
895    /// Increment the scheduler starvation-promotion counter.
896    pub fn record_scheduler_starvation_promotion(
897        &self,
898        queue: &str,
899        fairness_dimension: &str,
900        fairness_key: &str,
901    ) {
902        self.increment_counter(
903            "harn_scheduler_starvation_promotions_total",
904            labels([
905                ("queue", queue),
906                ("fairness_dimension", fairness_dimension),
907                ("fairness_key", fairness_key),
908            ]),
909            1,
910        );
911    }
912
913    /// Set the current scheduler deficit gauge for a fairness key.
914    pub fn set_scheduler_deficit(
915        &self,
916        queue: &str,
917        fairness_dimension: &str,
918        fairness_key: &str,
919        deficit: i64,
920    ) {
921        self.set_gauge(
922            "harn_scheduler_deficit",
923            labels([
924                ("queue", queue),
925                ("fairness_dimension", fairness_dimension),
926                ("fairness_key", fairness_key),
927            ]),
928            deficit as f64,
929        );
930    }
931
932    /// Set the oldest-eligible-job-age gauge for a fairness key (seconds).
933    pub fn set_scheduler_oldest_eligible_age(
934        &self,
935        queue: &str,
936        fairness_dimension: &str,
937        fairness_key: &str,
938        age_ms: u64,
939    ) {
940        self.set_gauge(
941            "harn_scheduler_oldest_eligible_age_seconds",
942            labels([
943                ("queue", queue),
944                ("fairness_dimension", fairness_dimension),
945                ("fairness_key", fairness_key),
946            ]),
947            age_ms as f64 / 1000.0,
948        );
949    }
950
951    pub fn set_orchestrator_pump_backlog(&self, topic: &str, count: u64) {
952        self.set_gauge(
953            "harn_orchestrator_pump_backlog",
954            labels([("topic", topic)]),
955            count as f64,
956        );
957    }
958
959    pub fn set_orchestrator_pump_outstanding(&self, topic: &str, count: usize) {
960        self.set_gauge(
961            "harn_orchestrator_pump_outstanding",
962            labels([("topic", topic)]),
963            count as f64,
964        );
965    }
966
967    pub fn record_orchestrator_pump_admission_delay(&self, topic: &str, duration: StdDuration) {
968        self.observe_histogram(
969            "harn_orchestrator_pump_admission_delay_seconds",
970            labels([("topic", topic)]),
971            duration.as_secs_f64(),
972            &Self::DURATION_BUCKETS,
973        );
974    }
975
976    pub fn record_llm_call(&self, provider: &str, model: &str, outcome: &str, cost_usd: f64) {
977        self.increment_counter(
978            "harn_llm_calls_total",
979            labels([
980                ("provider", provider),
981                ("model", model),
982                ("outcome", outcome),
983            ]),
984            1,
985        );
986        if cost_usd > 0.0 {
987            self.increment_counter(
988                "harn_llm_cost_usd_total",
989                labels([("provider", provider), ("model", model)]),
990                cost_usd,
991            );
992        } else {
993            self.ensure_counter(
994                "harn_llm_cost_usd_total",
995                labels([("provider", provider), ("model", model)]),
996            );
997        }
998    }
999
1000    pub fn record_llm_cache_hit(&self, provider: &str) {
1001        self.increment_counter(
1002            "harn_llm_cache_hits_total",
1003            labels([("provider", provider)]),
1004            1,
1005        );
1006    }
1007
1008    pub fn render_prometheus(&self) -> String {
1009        let snapshot = self.snapshot();
1010        let counters = [
1011            (
1012                "connector_linear_timestamp_rejections_total",
1013                snapshot.linear_timestamp_rejections_total,
1014            ),
1015            (
1016                "dispatch_succeeded_total",
1017                snapshot.dispatch_succeeded_total,
1018            ),
1019            ("dispatch_failed_total", snapshot.dispatch_failed_total),
1020            ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
1021            ("retry_scheduled_total", snapshot.retry_scheduled_total),
1022            (
1023                "slack_events_delivery_success_total",
1024                snapshot.slack_delivery_success_total,
1025            ),
1026            (
1027                "slack_events_delivery_failure_total",
1028                snapshot.slack_delivery_failure_total,
1029            ),
1030        ];
1031
1032        let mut rendered = String::new();
1033        for (name, value) in counters {
1034            rendered.push_str("# TYPE ");
1035            rendered.push_str(name);
1036            rendered.push_str(" counter\n");
1037            rendered.push_str(name);
1038            rendered.push(' ');
1039            rendered.push_str(&value.to_string());
1040            rendered.push('\n');
1041        }
1042        let custom_counters = self
1043            .custom_counters
1044            .lock()
1045            .expect("custom counters poisoned");
1046        for (name, value) in custom_counters.iter() {
1047            let metric_name = format!(
1048                "connector_custom_{}_total",
1049                name.chars()
1050                    .map(|ch| if ch.is_ascii_alphanumeric() || ch == '_' {
1051                        ch
1052                    } else {
1053                        '_'
1054                    })
1055                    .collect::<String>()
1056            );
1057            rendered.push_str("# TYPE ");
1058            rendered.push_str(&metric_name);
1059            rendered.push_str(" counter\n");
1060            rendered.push_str(&metric_name);
1061            rendered.push(' ');
1062            rendered.push_str(&value.to_string());
1063            rendered.push('\n');
1064        }
1065        rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
1066        rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
1067        rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
1068        rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
1069        self.render_generic_metrics(&mut rendered);
1070        rendered
1071    }
1072
1073    fn increment_counter(&self, name: &str, labels: MetricLabels, amount: impl Into<f64>) {
1074        let amount = amount.into();
1075        if amount <= 0.0 || !amount.is_finite() {
1076            return;
1077        }
1078        let mut counters = self.counters.lock().expect("metrics counters poisoned");
1079        *counters.entry((name.to_string(), labels)).or_default() += amount;
1080    }
1081
1082    fn ensure_counter(&self, name: &str, labels: MetricLabels) {
1083        let mut counters = self.counters.lock().expect("metrics counters poisoned");
1084        counters.entry((name.to_string(), labels)).or_default();
1085    }
1086
1087    fn set_gauge(&self, name: &str, labels: MetricLabels, value: f64) {
1088        let mut gauges = self.gauges.lock().expect("metrics gauges poisoned");
1089        gauges.insert((name.to_string(), labels), value);
1090    }
1091
1092    fn observe_histogram(
1093        &self,
1094        name: &str,
1095        labels: MetricLabels,
1096        value: f64,
1097        bucket_bounds: &[f64],
1098    ) {
1099        if !value.is_finite() {
1100            return;
1101        }
1102        let mut histograms = self.histograms.lock().expect("metrics histograms poisoned");
1103        let histogram = histograms
1104            .entry((name.to_string(), labels))
1105            .or_insert_with(|| HistogramMetric {
1106                buckets: bucket_bounds
1107                    .iter()
1108                    .map(|bound| (prometheus_float(*bound), 0))
1109                    .chain(std::iter::once(("+Inf".to_string(), 0)))
1110                    .collect(),
1111                count: 0,
1112                sum: 0.0,
1113            });
1114        histogram.count += 1;
1115        histogram.sum += value;
1116        for bound in bucket_bounds {
1117            if value <= *bound {
1118                let key = prometheus_float(*bound);
1119                *histogram.buckets.entry(key).or_default() += 1;
1120            }
1121        }
1122        *histogram.buckets.entry("+Inf".to_string()).or_default() += 1;
1123    }
1124
1125    fn refresh_oldest_pending_gauge(&self, labels: MetricLabels, now_ms: i64) {
1126        let oldest_accepted_at_ms = self
1127            .pending_trigger_events
1128            .lock()
1129            .expect("pending trigger events poisoned")
1130            .get(&labels)
1131            .and_then(|events| events.values().min().copied());
1132        let age_seconds = oldest_accepted_at_ms
1133            .map(|accepted_at_ms| millis_delta(now_ms, accepted_at_ms).as_secs_f64())
1134            .unwrap_or(0.0);
1135        self.set_gauge(
1136            "harn_trigger_oldest_pending_age_seconds",
1137            labels,
1138            age_seconds,
1139        );
1140    }
1141
1142    fn render_generic_metrics(&self, rendered: &mut String) {
1143        let counters = self
1144            .counters
1145            .lock()
1146            .expect("metrics counters poisoned")
1147            .clone();
1148        let gauges = self.gauges.lock().expect("metrics gauges poisoned").clone();
1149        let histograms = self
1150            .histograms
1151            .lock()
1152            .expect("metrics histograms poisoned")
1153            .clone();
1154
1155        for name in metric_family_names(MetricKind::Counter) {
1156            rendered.push_str("# TYPE ");
1157            rendered.push_str(name);
1158            rendered.push_str(" counter\n");
1159            for ((sample_name, labels), value) in counters.iter().filter(|((n, _), _)| n == name) {
1160                render_sample(rendered, sample_name, labels, *value);
1161            }
1162        }
1163        for name in metric_family_names(MetricKind::Gauge) {
1164            rendered.push_str("# TYPE ");
1165            rendered.push_str(name);
1166            rendered.push_str(" gauge\n");
1167            for ((sample_name, labels), value) in gauges.iter().filter(|((n, _), _)| n == name) {
1168                render_sample(rendered, sample_name, labels, *value);
1169            }
1170        }
1171        for name in metric_family_names(MetricKind::Histogram) {
1172            rendered.push_str("# TYPE ");
1173            rendered.push_str(name);
1174            rendered.push_str(" histogram\n");
1175            for ((sample_name, labels), histogram) in
1176                histograms.iter().filter(|((n, _), _)| n == name)
1177            {
1178                for (le, value) in &histogram.buckets {
1179                    let mut bucket_labels = labels.clone();
1180                    bucket_labels.insert("le".to_string(), le.clone());
1181                    render_sample(
1182                        rendered,
1183                        &format!("{sample_name}_bucket"),
1184                        &bucket_labels,
1185                        *value as f64,
1186                    );
1187                }
1188                render_sample(
1189                    rendered,
1190                    &format!("{sample_name}_sum"),
1191                    labels,
1192                    histogram.sum,
1193                );
1194                render_sample(
1195                    rendered,
1196                    &format!("{sample_name}_count"),
1197                    labels,
1198                    histogram.count as f64,
1199                );
1200            }
1201        }
1202    }
1203}
1204
1205#[derive(Clone, Copy)]
1206enum MetricKind {
1207    Counter,
1208    Gauge,
1209    Histogram,
1210}
1211
1212fn metric_family_names(kind: MetricKind) -> &'static [&'static str] {
1213    match kind {
1214        MetricKind::Counter => &[
1215            "harn_http_requests_total",
1216            "harn_trigger_received_total",
1217            "harn_trigger_deduped_total",
1218            "harn_trigger_predicate_evaluations_total",
1219            "harn_trigger_dispatched_total",
1220            "harn_trigger_retries_total",
1221            "harn_trigger_dlq_total",
1222            "harn_trigger_budget_exhausted_total",
1223            "harn_backpressure_events_total",
1224            "harn_a2a_hops_total",
1225            "harn_llm_calls_total",
1226            "harn_llm_cost_usd_total",
1227            "harn_llm_cache_hits_total",
1228            "harn_scheduler_selections_total",
1229            "harn_scheduler_deferrals_total",
1230            "harn_scheduler_starvation_promotions_total",
1231        ],
1232        MetricKind::Gauge => &[
1233            "harn_trigger_inflight",
1234            "harn_event_log_topic_size_bytes",
1235            "harn_event_log_consumer_lag",
1236            "harn_trigger_budget_cost_today_usd",
1237            "harn_worker_queue_depth",
1238            "harn_orchestrator_pump_backlog",
1239            "harn_orchestrator_pump_outstanding",
1240            "harn_trigger_oldest_pending_age_seconds",
1241            "harn_scheduler_deficit",
1242            "harn_scheduler_oldest_eligible_age_seconds",
1243        ],
1244        MetricKind::Histogram => &[
1245            "harn_http_request_duration_seconds",
1246            "harn_http_body_size_bytes",
1247            "harn_trigger_predicate_cost_usd",
1248            "harn_event_log_append_duration_seconds",
1249            "harn_a2a_hop_duration_seconds",
1250            "harn_worker_queue_claim_age_seconds",
1251            "harn_orchestrator_pump_admission_delay_seconds",
1252            "harn_trigger_webhook_accepted_to_normalized_seconds",
1253            "harn_trigger_webhook_accepted_to_queue_append_seconds",
1254            "harn_trigger_queue_age_at_dispatch_admission_seconds",
1255            "harn_trigger_queue_age_at_dispatch_start_seconds",
1256            "harn_trigger_dispatch_runtime_seconds",
1257            "harn_trigger_retry_delay_seconds",
1258            "harn_trigger_accepted_to_dlq_seconds",
1259        ],
1260    }
1261}
1262
1263fn labels<const N: usize>(pairs: [(&str, &str); N]) -> MetricLabels {
1264    pairs
1265        .into_iter()
1266        .map(|(name, value)| (name.to_string(), value.to_string()))
1267        .collect()
1268}
1269
1270fn trigger_lifecycle_labels(
1271    trigger_id: &str,
1272    binding_key: &str,
1273    provider: &str,
1274    tenant_id: Option<&str>,
1275    status: &str,
1276) -> MetricLabels {
1277    labels([
1278        ("binding_key", binding_key),
1279        ("provider", provider),
1280        ("status", status),
1281        ("tenant_id", tenant_label(tenant_id)),
1282        ("trigger_id", trigger_id),
1283    ])
1284}
1285
1286fn trigger_pending_labels(
1287    trigger_id: &str,
1288    binding_key: &str,
1289    provider: &str,
1290    tenant_id: Option<&str>,
1291) -> MetricLabels {
1292    labels([
1293        ("binding_key", binding_key),
1294        ("provider", provider),
1295        ("tenant_id", tenant_label(tenant_id)),
1296        ("trigger_id", trigger_id),
1297    ])
1298}
1299
1300fn tenant_label(tenant_id: Option<&str>) -> &str {
1301    tenant_id
1302        .map(str::trim)
1303        .filter(|value| !value.is_empty())
1304        .unwrap_or("none")
1305}
1306
1307fn millis_delta(later_ms: i64, earlier_ms: i64) -> StdDuration {
1308    StdDuration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
1309}
1310
1311fn render_sample(rendered: &mut String, name: &str, labels: &MetricLabels, value: f64) {
1312    rendered.push_str(name);
1313    if !labels.is_empty() {
1314        rendered.push('{');
1315        for (index, (label, label_value)) in labels.iter().enumerate() {
1316            if index > 0 {
1317                rendered.push(',');
1318            }
1319            rendered.push_str(label);
1320            rendered.push_str("=\"");
1321            rendered.push_str(&escape_label_value(label_value));
1322            rendered.push('"');
1323        }
1324        rendered.push('}');
1325    }
1326    rendered.push(' ');
1327    rendered.push_str(&prometheus_float(value));
1328    rendered.push('\n');
1329}
1330
1331fn escape_label_value(value: &str) -> String {
1332    value
1333        .chars()
1334        .flat_map(|ch| match ch {
1335            '\\' => "\\\\".chars().collect::<Vec<_>>(),
1336            '"' => "\\\"".chars().collect::<Vec<_>>(),
1337            '\n' => "\\n".chars().collect::<Vec<_>>(),
1338            other => vec![other],
1339        })
1340        .collect()
1341}
1342
1343fn prometheus_float(value: f64) -> String {
1344    if value.is_infinite() && value.is_sign_positive() {
1345        return "+Inf".to_string();
1346    }
1347    if value.fract() == 0.0 {
1348        format!("{value:.0}")
1349    } else {
1350        let rendered = format!("{value:.6}");
1351        rendered
1352            .trim_end_matches('0')
1353            .trim_end_matches('.')
1354            .to_string()
1355    }
1356}
1357
1358/// Provider payload schema metadata exposed by a connector.
1359#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1360pub struct ProviderPayloadSchema {
1361    pub harn_schema_name: String,
1362    #[serde(default)]
1363    pub json_schema: JsonValue,
1364}
1365
1366impl ProviderPayloadSchema {
1367    pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
1368        Self {
1369            harn_schema_name: harn_schema_name.into(),
1370            json_schema,
1371        }
1372    }
1373
1374    pub fn named(harn_schema_name: impl Into<String>) -> Self {
1375        Self::new(harn_schema_name, JsonValue::Null)
1376    }
1377}
1378
1379impl Default for ProviderPayloadSchema {
1380    fn default() -> Self {
1381        Self::named("raw")
1382    }
1383}
1384
1385/// High-level transport kind a connector supports.
1386#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1387#[serde(transparent)]
1388pub struct TriggerKind(String);
1389
1390impl TriggerKind {
1391    pub fn new(value: impl Into<String>) -> Self {
1392        Self(value.into())
1393    }
1394
1395    pub fn as_str(&self) -> &str {
1396        self.0.as_str()
1397    }
1398}
1399
1400impl From<&str> for TriggerKind {
1401    fn from(value: &str) -> Self {
1402        Self::new(value)
1403    }
1404}
1405
1406impl From<String> for TriggerKind {
1407    fn from(value: String) -> Self {
1408        Self::new(value)
1409    }
1410}
1411
1412/// Future trigger manifest binding routed to a connector activation.
1413#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1414pub struct TriggerBinding {
1415    pub provider: ProviderId,
1416    pub kind: TriggerKind,
1417    pub binding_id: String,
1418    #[serde(default)]
1419    pub dedupe_key: Option<String>,
1420    #[serde(default = "default_dedupe_retention_days")]
1421    pub dedupe_retention_days: u32,
1422    #[serde(default)]
1423    pub config: JsonValue,
1424}
1425
1426impl TriggerBinding {
1427    pub fn new(
1428        provider: ProviderId,
1429        kind: impl Into<TriggerKind>,
1430        binding_id: impl Into<String>,
1431    ) -> Self {
1432        Self {
1433            provider,
1434            kind: kind.into(),
1435            binding_id: binding_id.into(),
1436            dedupe_key: None,
1437            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1438            config: JsonValue::Null,
1439        }
1440    }
1441}
1442
1443fn default_dedupe_retention_days() -> u32 {
1444    crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
1445}
1446
1447/// Small in-memory trigger-binding registry used to fan bindings into connectors.
1448#[derive(Clone, Debug, Default)]
1449pub struct TriggerRegistry {
1450    bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
1451}
1452
1453impl TriggerRegistry {
1454    pub fn register(&mut self, binding: TriggerBinding) {
1455        self.bindings
1456            .entry(binding.provider.clone())
1457            .or_default()
1458            .push(binding);
1459    }
1460
1461    pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
1462        &self.bindings
1463    }
1464
1465    pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
1466        self.bindings
1467            .get(provider)
1468            .map(Vec::as_slice)
1469            .unwrap_or(&[])
1470    }
1471}
1472
1473/// Metadata returned from connector activation.
1474#[derive(Clone, Debug, PartialEq, Eq)]
1475pub struct ActivationHandle {
1476    pub provider: ProviderId,
1477    pub binding_count: usize,
1478}
1479
1480impl ActivationHandle {
1481    pub fn new(provider: ProviderId, binding_count: usize) -> Self {
1482        Self {
1483            provider,
1484            binding_count,
1485        }
1486    }
1487}
1488
1489/// Provider-native inbound request payload preserved as raw bytes.
1490#[derive(Clone, Debug, PartialEq)]
1491pub struct RawInbound {
1492    pub kind: String,
1493    pub headers: BTreeMap<String, String>,
1494    pub query: BTreeMap<String, String>,
1495    pub body: Vec<u8>,
1496    pub received_at: OffsetDateTime,
1497    pub occurred_at: Option<OffsetDateTime>,
1498    pub tenant_id: Option<TenantId>,
1499    pub metadata: JsonValue,
1500}
1501
1502impl RawInbound {
1503    pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
1504        Self {
1505            kind: kind.into(),
1506            headers,
1507            query: BTreeMap::new(),
1508            body,
1509            received_at: clock::now_utc(),
1510            occurred_at: None,
1511            tenant_id: None,
1512            metadata: JsonValue::Null,
1513        }
1514    }
1515
1516    pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
1517        Ok(serde_json::from_slice(&self.body)?)
1518    }
1519}
1520
1521/// Token-bucket configuration shared across connector clients.
1522#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1523pub struct RateLimitConfig {
1524    pub capacity: u32,
1525    pub refill_tokens: u32,
1526    pub refill_interval: StdDuration,
1527}
1528
1529impl Default for RateLimitConfig {
1530    fn default() -> Self {
1531        Self {
1532            capacity: 60,
1533            refill_tokens: 1,
1534            refill_interval: StdDuration::from_secs(1),
1535        }
1536    }
1537}
1538
1539#[derive(Clone, Debug)]
1540struct TokenBucket {
1541    tokens: f64,
1542    last_refill: ClockInstant,
1543}
1544
1545impl TokenBucket {
1546    fn full(config: RateLimitConfig) -> Self {
1547        Self {
1548            tokens: config.capacity as f64,
1549            last_refill: clock::instant_now(),
1550        }
1551    }
1552
1553    fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
1554        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1555        let rate = config.refill_tokens.max(1) as f64 / interval;
1556        let elapsed = now.duration_since(self.last_refill).as_secs_f64();
1557        self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
1558        self.last_refill = now;
1559    }
1560
1561    fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
1562        self.refill(config, now);
1563        if self.tokens >= 1.0 {
1564            self.tokens -= 1.0;
1565            true
1566        } else {
1567            false
1568        }
1569    }
1570
1571    fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
1572        if self.tokens >= 1.0 {
1573            return StdDuration::ZERO;
1574        }
1575        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1576        let rate = config.refill_tokens.max(1) as f64 / interval;
1577        let missing = (1.0 - self.tokens).max(0.0);
1578        StdDuration::from_secs_f64((missing / rate).max(0.001))
1579    }
1580}
1581
1582/// Shared per-provider, per-key token bucket factory for outbound connector clients.
1583#[derive(Debug)]
1584pub struct RateLimiterFactory {
1585    config: RateLimitConfig,
1586    buckets: Mutex<HashMap<(String, String), TokenBucket>>,
1587}
1588
1589impl RateLimiterFactory {
1590    pub fn new(config: RateLimitConfig) -> Self {
1591        Self {
1592            config,
1593            buckets: Mutex::new(HashMap::new()),
1594        }
1595    }
1596
1597    pub fn config(&self) -> RateLimitConfig {
1598        self.config
1599    }
1600
1601    pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
1602        ScopedRateLimiter {
1603            factory: self,
1604            provider: provider.clone(),
1605            key: key.into(),
1606        }
1607    }
1608
1609    pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
1610        let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1611        let bucket = buckets
1612            .entry((provider.as_str().to_string(), key.to_string()))
1613            .or_insert_with(|| TokenBucket::full(self.config));
1614        bucket.try_acquire(self.config, clock::instant_now())
1615    }
1616
1617    pub async fn acquire(&self, provider: &ProviderId, key: &str) {
1618        loop {
1619            let wait = {
1620                let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1621                let bucket = buckets
1622                    .entry((provider.as_str().to_string(), key.to_string()))
1623                    .or_insert_with(|| TokenBucket::full(self.config));
1624                if bucket.try_acquire(self.config, clock::instant_now()) {
1625                    return;
1626                }
1627                bucket.wait_duration(self.config)
1628            };
1629            tokio::time::sleep(wait).await;
1630        }
1631    }
1632}
1633
1634impl Default for RateLimiterFactory {
1635    fn default() -> Self {
1636        Self::new(RateLimitConfig::default())
1637    }
1638}
1639
1640/// Borrowed view onto a single provider/key rate-limit scope.
1641#[derive(Clone, Debug)]
1642pub struct ScopedRateLimiter<'a> {
1643    factory: &'a RateLimiterFactory,
1644    provider: ProviderId,
1645    key: String,
1646}
1647
1648impl<'a> ScopedRateLimiter<'a> {
1649    pub fn try_acquire(&self) -> bool {
1650        self.factory.try_acquire(&self.provider, &self.key)
1651    }
1652
1653    pub async fn acquire(&self) {
1654        self.factory.acquire(&self.provider, &self.key).await;
1655    }
1656}
1657
1658/// Runtime connector registry keyed by provider id.
1659pub struct ConnectorRegistry {
1660    connectors: BTreeMap<ProviderId, ConnectorHandle>,
1661}
1662
1663impl ConnectorRegistry {
1664    pub fn empty() -> Self {
1665        Self {
1666            connectors: BTreeMap::new(),
1667        }
1668    }
1669
1670    pub fn with_defaults() -> Self {
1671        let mut registry = Self::empty();
1672        for provider in registered_provider_metadata() {
1673            registry
1674                .register(default_connector_for_provider(&provider))
1675                .expect("default connector registration should not fail");
1676        }
1677        registry
1678    }
1679
1680    pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
1681        let provider = connector.provider_id().clone();
1682        if self.connectors.contains_key(&provider) {
1683            return Err(ConnectorError::DuplicateProvider(provider.0));
1684        }
1685        self.connectors
1686            .insert(provider, Arc::new(AsyncMutex::new(connector)));
1687        Ok(())
1688    }
1689
1690    pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
1691        self.connectors.get(id).cloned()
1692    }
1693
1694    pub fn remove(&mut self, id: &ProviderId) -> Option<ConnectorHandle> {
1695        self.connectors.remove(id)
1696    }
1697
1698    pub fn list(&self) -> Vec<ProviderId> {
1699        self.connectors.keys().cloned().collect()
1700    }
1701
1702    pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1703        for connector in self.connectors.values() {
1704            connector.lock().await.init(ctx.clone()).await?;
1705        }
1706        Ok(())
1707    }
1708
1709    pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
1710        let mut clients = BTreeMap::new();
1711        for (provider, connector) in &self.connectors {
1712            let client = connector.lock().await.client();
1713            clients.insert(provider.clone(), client);
1714        }
1715        clients
1716    }
1717
1718    pub async fn activate_all(
1719        &self,
1720        registry: &TriggerRegistry,
1721    ) -> Result<Vec<ActivationHandle>, ConnectorError> {
1722        let mut handles = Vec::new();
1723        for (provider, connector) in &self.connectors {
1724            let bindings = registry.bindings_for(provider);
1725            if bindings.is_empty() {
1726                continue;
1727            }
1728            let connector = connector.lock().await;
1729            handles.push(connector.activate(bindings).await?);
1730        }
1731        Ok(handles)
1732    }
1733}
1734
1735impl Default for ConnectorRegistry {
1736    fn default() -> Self {
1737        Self::with_defaults()
1738    }
1739}
1740
1741fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
1742    // The provider catalog on main registers `github` with
1743    // ProviderRuntimeMetadata::Builtin { connector: "webhook", ... } so that
1744    // before a native connector existed the catalog auto-wired a
1745    // GenericWebhookConnector. Now that #170 lands a first-class
1746    // GitHubConnector (inbound HMAC + GitHub App outbound), we short-circuit
1747    // provider_id "github" here and return the native connector instead of a
1748    // webhook-backed fallback. This keeps manifests that say
1749    // `provider = "github"` pointed at the new connector without requiring
1750    // users to switch to a distinct provider_id.
1751    if provider.provider == "github" {
1752        return Box::new(GitHubConnector::new());
1753    }
1754    if provider.provider == "linear" {
1755        return Box::new(LinearConnector::new());
1756    }
1757    if provider.provider == "slack" {
1758        return Box::new(SlackConnector::new());
1759    }
1760    if provider.provider == "notion" {
1761        return Box::new(NotionConnector::new());
1762    }
1763    if provider.provider == "a2a-push" {
1764        return Box::new(A2aPushConnector::new());
1765    }
1766    match &provider.runtime {
1767        ProviderRuntimeMetadata::Builtin {
1768            connector,
1769            default_signature_variant,
1770        } => match connector.as_str() {
1771            "cron" => Box::new(CronConnector::new()),
1772            "stream" => Box::new(StreamConnector::new(
1773                ProviderId::from(provider.provider.clone()),
1774                provider.schema_name.clone(),
1775            )),
1776            "webhook" => {
1777                let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
1778                    .expect("catalog webhook signature variant must be valid");
1779                Box::new(GenericWebhookConnector::with_profile(
1780                    WebhookProviderProfile::new(
1781                        ProviderId::from(provider.provider.clone()),
1782                        provider.schema_name.clone(),
1783                        variant,
1784                    ),
1785                ))
1786            }
1787            _ => Box::new(PlaceholderConnector::from_metadata(provider)),
1788        },
1789        ProviderRuntimeMetadata::Placeholder => {
1790            Box::new(PlaceholderConnector::from_metadata(provider))
1791        }
1792    }
1793}
1794
1795struct PlaceholderConnector {
1796    provider_id: ProviderId,
1797    kinds: Vec<TriggerKind>,
1798    schema_name: String,
1799}
1800
1801impl PlaceholderConnector {
1802    fn from_metadata(metadata: &ProviderMetadata) -> Self {
1803        Self {
1804            provider_id: ProviderId::from(metadata.provider.clone()),
1805            kinds: metadata
1806                .kinds
1807                .iter()
1808                .cloned()
1809                .map(TriggerKind::from)
1810                .collect(),
1811            schema_name: metadata.schema_name.clone(),
1812        }
1813    }
1814}
1815
1816struct PlaceholderClient;
1817
1818#[async_trait]
1819impl ConnectorClient for PlaceholderClient {
1820    async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1821        Err(ClientError::Other(format!(
1822            "connector client method '{method}' is not implemented for this provider"
1823        )))
1824    }
1825}
1826
1827#[async_trait]
1828impl Connector for PlaceholderConnector {
1829    fn provider_id(&self) -> &ProviderId {
1830        &self.provider_id
1831    }
1832
1833    fn kinds(&self) -> &[TriggerKind] {
1834        &self.kinds
1835    }
1836
1837    async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1838        Ok(())
1839    }
1840
1841    async fn activate(
1842        &self,
1843        bindings: &[TriggerBinding],
1844    ) -> Result<ActivationHandle, ConnectorError> {
1845        Ok(ActivationHandle::new(
1846            self.provider_id.clone(),
1847            bindings.len(),
1848        ))
1849    }
1850
1851    async fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
1852        Err(ConnectorError::Unsupported(format!(
1853            "provider '{}' is cataloged but does not have a concrete inbound connector yet",
1854            self.provider_id.as_str()
1855        )))
1856    }
1857
1858    fn payload_schema(&self) -> ProviderPayloadSchema {
1859        ProviderPayloadSchema::named(self.schema_name.clone())
1860    }
1861
1862    fn client(&self) -> Arc<dyn ConnectorClient> {
1863        Arc::new(PlaceholderClient)
1864    }
1865}
1866
1867pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
1868    ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
1869        *slot.borrow_mut() = clients
1870            .into_iter()
1871            .map(|(provider, client)| (provider.as_str().to_string(), client))
1872            .collect();
1873    });
1874}
1875
1876pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
1877    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
1878}
1879
1880pub fn clear_active_connector_clients() {
1881    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
1882}
1883
1884#[cfg(test)]
1885mod tests {
1886    use super::*;
1887
1888    use std::sync::atomic::{AtomicUsize, Ordering};
1889
1890    use async_trait::async_trait;
1891    use serde_json::json;
1892
1893    struct NoopClient;
1894
1895    #[async_trait]
1896    impl ConnectorClient for NoopClient {
1897        async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1898            Ok(json!({ "method": method }))
1899        }
1900    }
1901
1902    struct FakeConnector {
1903        provider_id: ProviderId,
1904        kinds: Vec<TriggerKind>,
1905        activate_calls: Arc<AtomicUsize>,
1906    }
1907
1908    impl FakeConnector {
1909        fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
1910            Self {
1911                provider_id: ProviderId::from(provider_id),
1912                kinds: vec![TriggerKind::from("webhook")],
1913                activate_calls,
1914            }
1915        }
1916    }
1917
1918    #[async_trait]
1919    impl Connector for FakeConnector {
1920        fn provider_id(&self) -> &ProviderId {
1921            &self.provider_id
1922        }
1923
1924        fn kinds(&self) -> &[TriggerKind] {
1925            &self.kinds
1926        }
1927
1928        async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1929            Ok(())
1930        }
1931
1932        async fn activate(
1933            &self,
1934            bindings: &[TriggerBinding],
1935        ) -> Result<ActivationHandle, ConnectorError> {
1936            self.activate_calls.fetch_add(1, Ordering::SeqCst);
1937            Ok(ActivationHandle::new(
1938                self.provider_id.clone(),
1939                bindings.len(),
1940            ))
1941        }
1942
1943        async fn normalize_inbound(
1944            &self,
1945            _raw: RawInbound,
1946        ) -> Result<TriggerEvent, ConnectorError> {
1947            Err(ConnectorError::Unsupported(
1948                "not needed for registry tests".to_string(),
1949            ))
1950        }
1951
1952        fn payload_schema(&self) -> ProviderPayloadSchema {
1953            ProviderPayloadSchema::named("FakePayload")
1954        }
1955
1956        fn client(&self) -> Arc<dyn ConnectorClient> {
1957            Arc::new(NoopClient)
1958        }
1959    }
1960
1961    #[tokio::test]
1962    async fn connector_registry_rejects_duplicate_providers() {
1963        let activate_calls = Arc::new(AtomicUsize::new(0));
1964        let mut registry = ConnectorRegistry::empty();
1965        registry
1966            .register(Box::new(FakeConnector::new(
1967                "github",
1968                activate_calls.clone(),
1969            )))
1970            .unwrap();
1971
1972        let error = registry
1973            .register(Box::new(FakeConnector::new("github", activate_calls)))
1974            .unwrap_err();
1975        assert!(matches!(
1976            error,
1977            ConnectorError::DuplicateProvider(provider) if provider == "github"
1978        ));
1979    }
1980
1981    #[tokio::test]
1982    async fn connector_registry_activates_only_bound_connectors() {
1983        let github_calls = Arc::new(AtomicUsize::new(0));
1984        let slack_calls = Arc::new(AtomicUsize::new(0));
1985        let mut registry = ConnectorRegistry::empty();
1986        registry
1987            .register(Box::new(FakeConnector::new("github", github_calls.clone())))
1988            .unwrap();
1989        registry
1990            .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
1991            .unwrap();
1992
1993        let mut trigger_registry = TriggerRegistry::default();
1994        trigger_registry.register(TriggerBinding::new(
1995            ProviderId::from("github"),
1996            "webhook",
1997            "github.push",
1998        ));
1999        trigger_registry.register(TriggerBinding::new(
2000            ProviderId::from("github"),
2001            "webhook",
2002            "github.installation",
2003        ));
2004
2005        let handles = registry.activate_all(&trigger_registry).await.unwrap();
2006        assert_eq!(handles.len(), 1);
2007        assert_eq!(handles[0].provider.as_str(), "github");
2008        assert_eq!(handles[0].binding_count, 2);
2009        assert_eq!(github_calls.load(Ordering::SeqCst), 1);
2010        assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
2011    }
2012
2013    #[test]
2014    fn rate_limiter_scopes_tokens_by_provider_and_key() {
2015        let factory = RateLimiterFactory::new(RateLimitConfig {
2016            capacity: 1,
2017            refill_tokens: 1,
2018            refill_interval: StdDuration::from_secs(60),
2019        });
2020
2021        assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
2022        assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
2023        assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
2024        assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
2025    }
2026
2027    #[test]
2028    fn raw_inbound_json_body_preserves_raw_bytes() {
2029        let raw = RawInbound::new(
2030            "push",
2031            BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
2032            br#"{"ok":true}"#.to_vec(),
2033        );
2034
2035        assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
2036    }
2037
2038    #[test]
2039    fn connector_registry_lists_catalog_providers() {
2040        let registry = ConnectorRegistry::default();
2041        let providers = registry.list();
2042        assert!(providers.contains(&ProviderId::from("cron")));
2043        assert!(providers.contains(&ProviderId::from("github")));
2044        assert!(providers.contains(&ProviderId::from("webhook")));
2045    }
2046
2047    #[test]
2048    fn metrics_registry_exports_orchestrator_metric_families() {
2049        let metrics = MetricsRegistry::default();
2050        metrics.record_http_request(
2051            "/triggers/github",
2052            "POST",
2053            200,
2054            StdDuration::from_millis(25),
2055            512,
2056        );
2057        metrics.record_trigger_received("github-new-issue", "github");
2058        metrics.record_trigger_deduped("github-new-issue", "inbox_duplicate");
2059        metrics.record_trigger_predicate_evaluation("github-new-issue", true, 0.002);
2060        metrics.record_trigger_dispatched("github-new-issue", "local", "succeeded");
2061        metrics.record_trigger_retry("github-new-issue", 2);
2062        metrics.record_trigger_dlq("github-new-issue", "retry_exhausted");
2063        metrics.set_trigger_inflight("github-new-issue", 0);
2064        metrics.record_event_log_append(
2065            "orchestrator.triggers.pending",
2066            StdDuration::from_millis(1),
2067            2048,
2068        );
2069        metrics.set_event_log_consumer_lag("orchestrator.triggers.pending", "orchestrator-pump", 0);
2070        metrics.set_trigger_budget_cost_today("github-new-issue", 0.002);
2071        metrics.record_trigger_budget_exhausted("github-new-issue", "daily_budget_exceeded");
2072        metrics.record_a2a_hop("agent.example", "succeeded", StdDuration::from_millis(10));
2073        metrics.set_worker_queue_depth("triage", 1);
2074        metrics.record_worker_queue_claim_age("triage", 3.0);
2075        metrics.set_orchestrator_pump_backlog("trigger.inbox.envelopes", 2);
2076        metrics.set_orchestrator_pump_outstanding("trigger.inbox.envelopes", 1);
2077        metrics.record_orchestrator_pump_admission_delay(
2078            "trigger.inbox.envelopes",
2079            StdDuration::from_millis(50),
2080        );
2081        metrics.record_trigger_accepted_to_normalized(
2082            "github-new-issue",
2083            "github-new-issue@v7",
2084            "github",
2085            Some("tenant-a"),
2086            "normalized",
2087            StdDuration::from_millis(25),
2088        );
2089        metrics.record_trigger_accepted_to_queue_append(
2090            "github-new-issue",
2091            "github-new-issue@v7",
2092            "github",
2093            Some("tenant-a"),
2094            "queued",
2095            StdDuration::from_millis(40),
2096        );
2097        metrics.record_trigger_queue_age_at_dispatch_admission(
2098            "github-new-issue",
2099            "github-new-issue@v7",
2100            "github",
2101            Some("tenant-a"),
2102            "admitted",
2103            StdDuration::from_millis(75),
2104        );
2105        metrics.record_trigger_queue_age_at_dispatch_start(
2106            "github-new-issue",
2107            "github-new-issue@v7",
2108            "github",
2109            Some("tenant-a"),
2110            "started",
2111            StdDuration::from_millis(125),
2112        );
2113        metrics.record_trigger_dispatch_runtime(
2114            "github-new-issue",
2115            "github-new-issue@v7",
2116            "github",
2117            Some("tenant-a"),
2118            "succeeded",
2119            StdDuration::from_millis(250),
2120        );
2121        metrics.record_trigger_retry_delay(
2122            "github-new-issue",
2123            "github-new-issue@v7",
2124            "github",
2125            Some("tenant-a"),
2126            "scheduled",
2127            StdDuration::from_secs(2),
2128        );
2129        metrics.record_trigger_accepted_to_dlq(
2130            "github-new-issue",
2131            "github-new-issue@v7",
2132            "github",
2133            Some("tenant-a"),
2134            "retry_exhausted",
2135            StdDuration::from_secs(45),
2136        );
2137        metrics.record_backpressure_event("ingest", "reject");
2138        metrics.note_trigger_pending_event(
2139            "evt-1",
2140            "github-new-issue",
2141            "github-new-issue@v7",
2142            "github",
2143            Some("tenant-a"),
2144            1_000,
2145            4_000,
2146        );
2147        metrics.record_llm_call("mock", "mock", "succeeded", 0.01);
2148        metrics.record_llm_cache_hit("mock");
2149
2150        let rendered = metrics.render_prometheus();
2151        for needle in [
2152            "harn_http_requests_total{endpoint=\"/triggers/github\",method=\"POST\",status=\"200\"} 1",
2153            "harn_http_request_duration_seconds_bucket{endpoint=\"/triggers/github\",le=\"0.05\"} 1",
2154            "harn_http_body_size_bytes_bucket{endpoint=\"/triggers/github\",le=\"512\"} 1",
2155            "harn_trigger_received_total{provider=\"github\",trigger_id=\"github-new-issue\"} 1",
2156            "harn_trigger_deduped_total{reason=\"inbox_duplicate\",trigger_id=\"github-new-issue\"} 1",
2157            "harn_trigger_predicate_evaluations_total{result=\"true\",trigger_id=\"github-new-issue\"} 1",
2158            "harn_trigger_predicate_cost_usd_bucket{le=\"0.01\",trigger_id=\"github-new-issue\"} 1",
2159            "harn_trigger_dispatched_total{handler_kind=\"local\",outcome=\"succeeded\",trigger_id=\"github-new-issue\"} 1",
2160            "harn_trigger_retries_total{attempt=\"2\",trigger_id=\"github-new-issue\"} 1",
2161            "harn_trigger_dlq_total{reason=\"retry_exhausted\",trigger_id=\"github-new-issue\"} 1",
2162            "harn_trigger_inflight{trigger_id=\"github-new-issue\"} 0",
2163            "harn_event_log_append_duration_seconds_bucket{le=\"0.005\",topic=\"orchestrator.triggers.pending\"} 1",
2164            "harn_event_log_topic_size_bytes{topic=\"orchestrator.triggers.pending\"} 2048",
2165            "harn_event_log_consumer_lag{consumer=\"orchestrator-pump\",topic=\"orchestrator.triggers.pending\"} 0",
2166            "harn_trigger_budget_cost_today_usd{trigger_id=\"github-new-issue\"} 0.002",
2167            "harn_trigger_budget_exhausted_total{strategy=\"daily_budget_exceeded\",trigger_id=\"github-new-issue\"} 1",
2168            "harn_backpressure_events_total{action=\"reject\",dimension=\"ingest\"} 1",
2169            "harn_a2a_hops_total{outcome=\"succeeded\",target=\"agent.example\"} 1",
2170            "harn_a2a_hop_duration_seconds_bucket{le=\"0.01\",target=\"agent.example\"} 1",
2171            "harn_worker_queue_depth{queue=\"triage\"} 1",
2172            "harn_worker_queue_claim_age_seconds_bucket{le=\"5\",queue=\"triage\"} 1",
2173            "harn_orchestrator_pump_backlog{topic=\"trigger.inbox.envelopes\"} 2",
2174            "harn_orchestrator_pump_outstanding{topic=\"trigger.inbox.envelopes\"} 1",
2175            "harn_orchestrator_pump_admission_delay_seconds_bucket{le=\"0.05\",topic=\"trigger.inbox.envelopes\"} 1",
2176            "harn_trigger_webhook_accepted_to_normalized_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.025\",provider=\"github\",status=\"normalized\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2177            "harn_trigger_webhook_accepted_to_queue_append_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.05\",provider=\"github\",status=\"queued\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2178            "harn_trigger_queue_age_at_dispatch_admission_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.1\",provider=\"github\",status=\"admitted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2179            "harn_trigger_queue_age_at_dispatch_start_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"started\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2180            "harn_trigger_dispatch_runtime_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"succeeded\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2181            "harn_trigger_retry_delay_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"2.5\",provider=\"github\",status=\"scheduled\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2182            "harn_trigger_accepted_to_dlq_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"60\",provider=\"github\",status=\"retry_exhausted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2183            "harn_trigger_oldest_pending_age_seconds{binding_key=\"github-new-issue@v7\",provider=\"github\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 3",
2184            "harn_llm_calls_total{model=\"mock\",outcome=\"succeeded\",provider=\"mock\"} 1",
2185            "harn_llm_cost_usd_total{model=\"mock\",provider=\"mock\"} 0.01",
2186            "harn_llm_cache_hits_total{provider=\"mock\"} 1",
2187        ] {
2188            assert!(rendered.contains(needle), "missing {needle}\n{rendered}");
2189        }
2190    }
2191}