Skip to main content

harn_vm/connectors/
mod.rs

1//! Connector traits and shared helpers for inbound event-source providers.
2//!
3//! This lands in `harn-vm` for now because the current dependency surface
4//! (`EventLog`, `SecretProvider`, `TriggerEvent`) already lives here. If the
5//! connector ecosystem grows enough to justify extraction, the module can be
6//! split into a dedicated crate later without changing the high-level contract.
7
8use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex, OnceLock};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25    registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26    ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod a2a_push;
30pub mod cron;
31pub mod effect_policy;
32pub mod github;
33pub mod harn_module;
34pub mod hmac;
35pub mod linear;
36pub mod notion;
37pub mod slack;
38pub mod stream;
39#[cfg(test)]
40pub(crate) mod test_util;
41pub mod testkit;
42pub mod webhook;
43
44pub use a2a_push::A2aPushConnector;
45pub use cron::{CatchupMode, CronConnector};
46pub use effect_policy::{
47    connector_export_denied_builtin_reason, connector_export_effect_class,
48    default_connector_export_policy, ConnectorExportEffectClass, HarnConnectorEffectPolicies,
49};
50pub use github::GitHubConnector;
51pub use harn_module::{
52    load_contract as load_harn_connector_contract, HarnConnector, HarnConnectorContract,
53};
54pub use hmac::{
55    verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
56    DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
57    DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
58    DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
59    DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
60    DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
61    SIGNATURE_VERIFY_AUDIT_TOPIC,
62};
63pub use linear::LinearConnector;
64pub use notion::{
65    load_pending_webhook_handshakes, NotionConnector, PersistedNotionWebhookHandshake,
66};
67pub use slack::SlackConnector;
68pub use stream::StreamConnector;
69use webhook::WebhookProviderProfile;
70pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
71
72const OUTBOUND_CONNECTOR_HTTP_TIMEOUT: StdDuration = StdDuration::from_secs(30);
73
74pub(crate) fn outbound_http_client(user_agent: &'static str) -> reqwest::Client {
75    reqwest::Client::builder()
76        .user_agent(user_agent)
77        .timeout(OUTBOUND_CONNECTOR_HTTP_TIMEOUT)
78        .build()
79        .expect("connector HTTP client configuration should be valid")
80}
81
82/// Shared owned handle to a connector instance registered with the runtime.
83pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
84
85thread_local! {
86    static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
87        RefCell::new(BTreeMap::new());
88}
89
90/// Provider implementation contract for inbound connectors.
91#[async_trait]
92pub trait Connector: Send + Sync {
93    /// Stable provider id such as `github`, `slack`, or `webhook`.
94    fn provider_id(&self) -> &ProviderId;
95
96    /// Trigger kinds this connector supports (`webhook`, `poll`, `stream`, ...).
97    fn kinds(&self) -> &[TriggerKind];
98
99    /// Called once per connector instance at orchestrator startup.
100    async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
101
102    /// Activate the bindings relevant to this connector instance.
103    async fn activate(
104        &self,
105        bindings: &[TriggerBinding],
106    ) -> Result<ActivationHandle, ConnectorError>;
107
108    /// Stop connector-owned background work and flush any connector-local state.
109    async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
110        Ok(())
111    }
112
113    /// Verify + normalize a provider-native inbound request into `TriggerEvent`.
114    async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
115
116    /// Verify + normalize a provider-native inbound request into the richer
117    /// connector result contract used by ack-first webhook adapters.
118    async fn normalize_inbound_result(
119        &self,
120        raw: RawInbound,
121    ) -> Result<ConnectorNormalizeResult, ConnectorError> {
122        self.normalize_inbound(raw)
123            .await
124            .map(ConnectorNormalizeResult::event)
125    }
126
127    /// Payload schema surfaced to future trigger-type narrowing.
128    fn payload_schema(&self) -> ProviderPayloadSchema;
129
130    /// Outbound API wrapper exposed to handlers.
131    fn client(&self) -> Arc<dyn ConnectorClient>;
132}
133
134/// Provider-supplied HTTP response returned before or instead of trigger dispatch.
135#[derive(Clone, Debug, PartialEq)]
136pub struct ConnectorHttpResponse {
137    pub status: u16,
138    pub headers: BTreeMap<String, String>,
139    pub body: JsonValue,
140}
141
142impl ConnectorHttpResponse {
143    pub fn new(status: u16, headers: BTreeMap<String, String>, body: JsonValue) -> Self {
144        Self {
145            status,
146            headers,
147            body,
148        }
149    }
150}
151
152/// Normalized inbound result accepted by the runtime connector adapter.
153#[derive(Clone, Debug, PartialEq)]
154pub enum ConnectorNormalizeResult {
155    Event(Box<TriggerEvent>),
156    Batch(Vec<TriggerEvent>),
157    ImmediateResponse {
158        response: ConnectorHttpResponse,
159        events: Vec<TriggerEvent>,
160    },
161    Reject(ConnectorHttpResponse),
162}
163
164impl ConnectorNormalizeResult {
165    pub fn event(event: TriggerEvent) -> Self {
166        Self::Event(Box::new(event))
167    }
168
169    pub fn into_events(self) -> Vec<TriggerEvent> {
170        match self {
171            Self::Event(event) => vec![*event],
172            Self::Batch(events) | Self::ImmediateResponse { events, .. } => events,
173            Self::Reject(_) => Vec::new(),
174        }
175    }
176}
177
178#[derive(Clone, Debug, PartialEq)]
179pub enum PostNormalizeOutcome {
180    Ready(Box<TriggerEvent>),
181    DuplicateDropped,
182}
183
184pub async fn postprocess_normalized_event(
185    inbox: &InboxIndex,
186    binding_id: &str,
187    dedupe_enabled: bool,
188    dedupe_ttl: StdDuration,
189    mut event: TriggerEvent,
190) -> Result<PostNormalizeOutcome, ConnectorError> {
191    if dedupe_enabled && !event.dedupe_claimed() {
192        if !inbox
193            .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
194            .await?
195        {
196            return Ok(PostNormalizeOutcome::DuplicateDropped);
197        }
198        event.mark_dedupe_claimed();
199    }
200
201    Ok(PostNormalizeOutcome::Ready(Box::new(event)))
202}
203
204/// Outbound provider client interface used by connector-backed stdlib modules.
205#[async_trait]
206pub trait ConnectorClient: Send + Sync {
207    async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
208}
209
210/// Minimal outbound client errors shared by connector implementations.
211#[derive(Clone, Debug, PartialEq, Eq)]
212pub enum ClientError {
213    MethodNotFound(String),
214    InvalidArgs(String),
215    RateLimited(String),
216    Transport(String),
217    Other(String),
218}
219
220impl fmt::Display for ClientError {
221    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
222        match self {
223            Self::MethodNotFound(message)
224            | Self::InvalidArgs(message)
225            | Self::RateLimited(message)
226            | Self::Transport(message)
227            | Self::Other(message) => message.fmt(f),
228        }
229    }
230}
231
232impl std::error::Error for ClientError {}
233
234/// Shared connector-layer errors.
235#[derive(Debug)]
236pub enum ConnectorError {
237    DuplicateProvider(String),
238    DuplicateDelivery(String),
239    UnknownProvider(String),
240    MissingHeader(String),
241    InvalidHeader {
242        name: String,
243        detail: String,
244    },
245    InvalidSignature(String),
246    TimestampOutOfWindow {
247        timestamp: OffsetDateTime,
248        now: OffsetDateTime,
249        window: time::Duration,
250    },
251    Json(String),
252    Secret(String),
253    EventLog(String),
254    HarnRuntime(String),
255    Client(ClientError),
256    Unsupported(String),
257    Activation(String),
258}
259
260impl ConnectorError {
261    pub fn invalid_signature(message: impl Into<String>) -> Self {
262        Self::InvalidSignature(message.into())
263    }
264}
265
266impl fmt::Display for ConnectorError {
267    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
268        match self {
269            Self::DuplicateProvider(provider) => {
270                write!(f, "connector provider `{provider}` is already registered")
271            }
272            Self::DuplicateDelivery(message) => message.fmt(f),
273            Self::UnknownProvider(provider) => {
274                write!(f, "connector provider `{provider}` is not registered")
275            }
276            Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
277            Self::InvalidHeader { name, detail } => {
278                write!(f, "invalid header `{name}`: {detail}")
279            }
280            Self::InvalidSignature(message)
281            | Self::Json(message)
282            | Self::Secret(message)
283            | Self::EventLog(message)
284            | Self::HarnRuntime(message)
285            | Self::Unsupported(message)
286            | Self::Activation(message) => message.fmt(f),
287            Self::TimestampOutOfWindow {
288                timestamp,
289                now,
290                window,
291            } => write!(
292                f,
293                "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
294            ),
295            Self::Client(error) => error.fmt(f),
296        }
297    }
298}
299
300impl std::error::Error for ConnectorError {}
301
302impl From<crate::event_log::LogError> for ConnectorError {
303    fn from(value: crate::event_log::LogError) -> Self {
304        Self::EventLog(value.to_string())
305    }
306}
307
308impl From<crate::secrets::SecretError> for ConnectorError {
309    fn from(value: crate::secrets::SecretError) -> Self {
310        Self::Secret(value.to_string())
311    }
312}
313
314impl From<serde_json::Error> for ConnectorError {
315    fn from(value: serde_json::Error) -> Self {
316        Self::Json(value.to_string())
317    }
318}
319
320impl From<ClientError> for ConnectorError {
321    fn from(value: ClientError) -> Self {
322        Self::Client(value)
323    }
324}
325
326/// Startup context shared with connector instances.
327#[derive(Clone)]
328pub struct ConnectorCtx {
329    pub event_log: Arc<AnyEventLog>,
330    pub secrets: Arc<dyn SecretProvider>,
331    pub inbox: Arc<InboxIndex>,
332    pub metrics: Arc<MetricsRegistry>,
333    pub rate_limiter: Arc<RateLimiterFactory>,
334}
335
336/// Snapshot of connector-local metrics surfaced for tests and diagnostics.
337#[derive(Clone, Debug, Default, PartialEq, Eq)]
338pub struct ConnectorMetricsSnapshot {
339    pub inbox_claims_written: u64,
340    pub inbox_duplicates_rejected: u64,
341    pub inbox_fast_path_hits: u64,
342    pub inbox_durable_hits: u64,
343    pub inbox_expired_entries: u64,
344    pub inbox_active_entries: u64,
345    pub linear_timestamp_rejections_total: u64,
346    pub dispatch_succeeded_total: u64,
347    pub dispatch_failed_total: u64,
348    pub retry_scheduled_total: u64,
349    pub slack_delivery_success_total: u64,
350    pub slack_delivery_failure_total: u64,
351}
352
353type MetricLabels = BTreeMap<String, String>;
354
355#[derive(Clone, Debug, Default, PartialEq)]
356struct HistogramMetric {
357    buckets: BTreeMap<String, u64>,
358    count: u64,
359    sum: f64,
360}
361
362static ACTIVE_METRICS_REGISTRY: OnceLock<Mutex<Option<Arc<MetricsRegistry>>>> = OnceLock::new();
363
364pub fn install_active_metrics_registry(metrics: Arc<MetricsRegistry>) {
365    let slot = ACTIVE_METRICS_REGISTRY.get_or_init(|| Mutex::new(None));
366    *slot.lock().expect("active metrics registry poisoned") = Some(metrics);
367}
368
369pub fn clear_active_metrics_registry() {
370    if let Some(slot) = ACTIVE_METRICS_REGISTRY.get() {
371        *slot.lock().expect("active metrics registry poisoned") = None;
372    }
373}
374
375pub fn active_metrics_registry() -> Option<Arc<MetricsRegistry>> {
376    ACTIVE_METRICS_REGISTRY.get().and_then(|slot| {
377        slot.lock()
378            .expect("active metrics registry poisoned")
379            .clone()
380    })
381}
382
383/// Shared metrics surface for connector-local counters and timings.
384#[derive(Debug, Default)]
385pub struct MetricsRegistry {
386    inbox_claims_written: AtomicU64,
387    inbox_duplicates_rejected: AtomicU64,
388    inbox_fast_path_hits: AtomicU64,
389    inbox_durable_hits: AtomicU64,
390    inbox_expired_entries: AtomicU64,
391    inbox_active_entries: AtomicU64,
392    linear_timestamp_rejections_total: AtomicU64,
393    dispatch_succeeded_total: AtomicU64,
394    dispatch_failed_total: AtomicU64,
395    retry_scheduled_total: AtomicU64,
396    slack_delivery_success_total: AtomicU64,
397    slack_delivery_failure_total: AtomicU64,
398    custom_counters: Mutex<BTreeMap<String, u64>>,
399    counters: Mutex<BTreeMap<(String, MetricLabels), f64>>,
400    gauges: Mutex<BTreeMap<(String, MetricLabels), f64>>,
401    histograms: Mutex<BTreeMap<(String, MetricLabels), HistogramMetric>>,
402    pending_trigger_events: Mutex<BTreeMap<MetricLabels, BTreeMap<String, i64>>>,
403}
404
405impl MetricsRegistry {
406    const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0];
407    const TRIGGER_LATENCY_BUCKETS: [f64; 15] = [
408        0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0,
409    ];
410    const SIZE_BUCKETS: [f64; 9] = [
411        128.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0, 10485760.0,
412    ];
413
414    pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
415        ConnectorMetricsSnapshot {
416            inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
417            inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
418            inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
419            inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
420            inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
421            inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
422            linear_timestamp_rejections_total: self
423                .linear_timestamp_rejections_total
424                .load(Ordering::Relaxed),
425            dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
426            dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
427            retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
428            slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
429            slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
430        }
431    }
432
433    pub(crate) fn record_inbox_claim(&self) {
434        self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
435    }
436
437    pub(crate) fn record_inbox_duplicate_fast_path(&self) {
438        self.inbox_duplicates_rejected
439            .fetch_add(1, Ordering::Relaxed);
440        self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
441    }
442
443    pub(crate) fn record_inbox_duplicate_durable(&self) {
444        self.inbox_duplicates_rejected
445            .fetch_add(1, Ordering::Relaxed);
446        self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
447    }
448
449    pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
450        if count > 0 {
451            self.inbox_expired_entries
452                .fetch_add(count, Ordering::Relaxed);
453        }
454    }
455
456    pub(crate) fn set_inbox_active_entries(&self, count: usize) {
457        self.inbox_active_entries
458            .store(count as u64, Ordering::Relaxed);
459    }
460
461    pub fn record_linear_timestamp_rejection(&self) {
462        self.linear_timestamp_rejections_total
463            .fetch_add(1, Ordering::Relaxed);
464    }
465
466    pub fn record_dispatch_succeeded(&self) {
467        self.dispatch_succeeded_total
468            .fetch_add(1, Ordering::Relaxed);
469    }
470
471    pub fn record_dispatch_failed(&self) {
472        self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
473    }
474
475    pub fn record_retry_scheduled(&self) {
476        self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
477    }
478
479    pub fn record_slack_delivery_success(&self) {
480        self.slack_delivery_success_total
481            .fetch_add(1, Ordering::Relaxed);
482    }
483
484    pub fn record_slack_delivery_failure(&self) {
485        self.slack_delivery_failure_total
486            .fetch_add(1, Ordering::Relaxed);
487    }
488
489    pub fn record_custom_counter(&self, name: &str, amount: u64) {
490        if amount == 0 {
491            return;
492        }
493        let mut counters = self
494            .custom_counters
495            .lock()
496            .expect("custom counters poisoned");
497        *counters.entry(name.to_string()).or_default() += amount;
498    }
499
500    pub fn record_http_request(
501        &self,
502        endpoint: &str,
503        method: &str,
504        status: u16,
505        duration: StdDuration,
506        body_size_bytes: usize,
507    ) {
508        self.increment_counter(
509            "harn_http_requests_total",
510            labels([
511                ("endpoint", endpoint),
512                ("method", method),
513                ("status", &status.to_string()),
514            ]),
515            1,
516        );
517        self.observe_histogram(
518            "harn_http_request_duration_seconds",
519            labels([("endpoint", endpoint)]),
520            duration.as_secs_f64(),
521            &Self::DURATION_BUCKETS,
522        );
523        self.observe_histogram(
524            "harn_http_body_size_bytes",
525            labels([("endpoint", endpoint)]),
526            body_size_bytes as f64,
527            &Self::SIZE_BUCKETS,
528        );
529    }
530
531    pub fn record_trigger_received(&self, trigger_id: &str, provider: &str) {
532        self.increment_counter(
533            "harn_trigger_received_total",
534            labels([("trigger_id", trigger_id), ("provider", provider)]),
535            1,
536        );
537    }
538
539    pub fn record_trigger_deduped(&self, trigger_id: &str, reason: &str) {
540        self.increment_counter(
541            "harn_trigger_deduped_total",
542            labels([("trigger_id", trigger_id), ("reason", reason)]),
543            1,
544        );
545    }
546
547    pub fn record_trigger_predicate_evaluation(
548        &self,
549        trigger_id: &str,
550        result: bool,
551        cost_usd: f64,
552    ) {
553        self.increment_counter(
554            "harn_trigger_predicate_evaluations_total",
555            labels([
556                ("trigger_id", trigger_id),
557                ("result", if result { "true" } else { "false" }),
558            ]),
559            1,
560        );
561        self.observe_histogram(
562            "harn_trigger_predicate_cost_usd",
563            labels([("trigger_id", trigger_id)]),
564            cost_usd.max(0.0),
565            &[0.0, 0.001, 0.01, 0.05, 0.1, 1.0],
566        );
567    }
568
569    pub fn record_trigger_dispatched(&self, trigger_id: &str, handler_kind: &str, outcome: &str) {
570        self.increment_counter(
571            "harn_trigger_dispatched_total",
572            labels([
573                ("trigger_id", trigger_id),
574                ("handler_kind", handler_kind),
575                ("outcome", outcome),
576            ]),
577            1,
578        );
579    }
580
581    pub fn record_trigger_retry(&self, trigger_id: &str, attempt: u32) {
582        self.increment_counter(
583            "harn_trigger_retries_total",
584            labels([
585                ("trigger_id", trigger_id),
586                ("attempt", &attempt.to_string()),
587            ]),
588            1,
589        );
590    }
591
592    pub fn record_trigger_dlq(&self, trigger_id: &str, reason: &str) {
593        self.increment_counter(
594            "harn_trigger_dlq_total",
595            labels([("trigger_id", trigger_id), ("reason", reason)]),
596            1,
597        );
598    }
599
600    pub fn record_trigger_accepted_to_normalized(
601        &self,
602        trigger_id: &str,
603        binding_key: &str,
604        provider: &str,
605        tenant_id: Option<&str>,
606        status: &str,
607        duration: StdDuration,
608    ) {
609        self.observe_histogram(
610            "harn_trigger_webhook_accepted_to_normalized_seconds",
611            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
612            duration.as_secs_f64(),
613            &Self::TRIGGER_LATENCY_BUCKETS,
614        );
615    }
616
617    pub fn record_trigger_accepted_to_queue_append(
618        &self,
619        trigger_id: &str,
620        binding_key: &str,
621        provider: &str,
622        tenant_id: Option<&str>,
623        status: &str,
624        duration: StdDuration,
625    ) {
626        self.observe_histogram(
627            "harn_trigger_webhook_accepted_to_queue_append_seconds",
628            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
629            duration.as_secs_f64(),
630            &Self::TRIGGER_LATENCY_BUCKETS,
631        );
632    }
633
634    pub fn record_trigger_queue_age_at_dispatch_admission(
635        &self,
636        trigger_id: &str,
637        binding_key: &str,
638        provider: &str,
639        tenant_id: Option<&str>,
640        status: &str,
641        age: StdDuration,
642    ) {
643        self.observe_histogram(
644            "harn_trigger_queue_age_at_dispatch_admission_seconds",
645            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
646            age.as_secs_f64(),
647            &Self::TRIGGER_LATENCY_BUCKETS,
648        );
649    }
650
651    pub fn record_trigger_queue_age_at_dispatch_start(
652        &self,
653        trigger_id: &str,
654        binding_key: &str,
655        provider: &str,
656        tenant_id: Option<&str>,
657        status: &str,
658        age: StdDuration,
659    ) {
660        self.observe_histogram(
661            "harn_trigger_queue_age_at_dispatch_start_seconds",
662            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
663            age.as_secs_f64(),
664            &Self::TRIGGER_LATENCY_BUCKETS,
665        );
666    }
667
668    pub fn record_trigger_dispatch_runtime(
669        &self,
670        trigger_id: &str,
671        binding_key: &str,
672        provider: &str,
673        tenant_id: Option<&str>,
674        status: &str,
675        duration: StdDuration,
676    ) {
677        self.observe_histogram(
678            "harn_trigger_dispatch_runtime_seconds",
679            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
680            duration.as_secs_f64(),
681            &Self::TRIGGER_LATENCY_BUCKETS,
682        );
683    }
684
685    pub fn record_trigger_retry_delay(
686        &self,
687        trigger_id: &str,
688        binding_key: &str,
689        provider: &str,
690        tenant_id: Option<&str>,
691        status: &str,
692        duration: StdDuration,
693    ) {
694        self.observe_histogram(
695            "harn_trigger_retry_delay_seconds",
696            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
697            duration.as_secs_f64(),
698            &Self::TRIGGER_LATENCY_BUCKETS,
699        );
700    }
701
702    pub fn record_trigger_accepted_to_dlq(
703        &self,
704        trigger_id: &str,
705        binding_key: &str,
706        provider: &str,
707        tenant_id: Option<&str>,
708        status: &str,
709        duration: StdDuration,
710    ) {
711        self.observe_histogram(
712            "harn_trigger_accepted_to_dlq_seconds",
713            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
714            duration.as_secs_f64(),
715            &Self::TRIGGER_LATENCY_BUCKETS,
716        );
717    }
718
719    pub fn note_trigger_pending_event(
720        &self,
721        event_id: &str,
722        trigger_id: &str,
723        binding_key: &str,
724        provider: &str,
725        tenant_id: Option<&str>,
726        accepted_at_ms: i64,
727        now_ms: i64,
728    ) {
729        let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
730        {
731            let mut pending = self
732                .pending_trigger_events
733                .lock()
734                .expect("pending trigger events poisoned");
735            pending
736                .entry(labels.clone())
737                .or_default()
738                .insert(event_id.to_string(), accepted_at_ms);
739        }
740        self.refresh_oldest_pending_gauge(labels, now_ms);
741    }
742
743    pub fn clear_trigger_pending_event(
744        &self,
745        event_id: &str,
746        trigger_id: &str,
747        binding_key: &str,
748        provider: &str,
749        tenant_id: Option<&str>,
750        now_ms: i64,
751    ) {
752        let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
753        {
754            let mut pending = self
755                .pending_trigger_events
756                .lock()
757                .expect("pending trigger events poisoned");
758            if let Some(events) = pending.get_mut(&labels) {
759                events.remove(event_id);
760                if events.is_empty() {
761                    pending.remove(&labels);
762                }
763            }
764        }
765        self.refresh_oldest_pending_gauge(labels, now_ms);
766    }
767
768    pub fn set_trigger_inflight(&self, trigger_id: &str, count: u64) {
769        self.set_gauge(
770            "harn_trigger_inflight",
771            labels([("trigger_id", trigger_id)]),
772            count as f64,
773        );
774    }
775
776    pub fn set_trigger_budget_cost_today(&self, trigger_id: &str, cost_usd: f64) {
777        self.set_gauge(
778            "harn_trigger_budget_cost_today_usd",
779            labels([("trigger_id", trigger_id)]),
780            cost_usd.max(0.0),
781        );
782    }
783
784    pub fn record_trigger_budget_exhausted(&self, trigger_id: &str, strategy: &str) {
785        self.increment_counter(
786            "harn_trigger_budget_exhausted_total",
787            labels([("trigger_id", trigger_id), ("strategy", strategy)]),
788            1,
789        );
790    }
791
792    pub fn record_backpressure_event(&self, dimension: &str, action: &str) {
793        self.increment_counter(
794            "harn_backpressure_events_total",
795            labels([("dimension", dimension), ("action", action)]),
796            1,
797        );
798    }
799
800    pub fn record_event_log_append(
801        &self,
802        topic: &str,
803        duration: StdDuration,
804        payload_bytes: usize,
805    ) {
806        self.observe_histogram(
807            "harn_event_log_append_duration_seconds",
808            labels([("topic", topic)]),
809            duration.as_secs_f64(),
810            &Self::DURATION_BUCKETS,
811        );
812        self.set_gauge(
813            "harn_event_log_topic_size_bytes",
814            labels([("topic", topic)]),
815            payload_bytes as f64,
816        );
817    }
818
819    pub fn set_event_log_consumer_lag(&self, topic: &str, consumer: &str, lag: u64) {
820        self.set_gauge(
821            "harn_event_log_consumer_lag",
822            labels([("topic", topic), ("consumer", consumer)]),
823            lag as f64,
824        );
825    }
826
827    pub fn record_a2a_hop(&self, target: &str, outcome: &str, duration: StdDuration) {
828        self.increment_counter(
829            "harn_a2a_hops_total",
830            labels([("target", target), ("outcome", outcome)]),
831            1,
832        );
833        self.observe_histogram(
834            "harn_a2a_hop_duration_seconds",
835            labels([("target", target)]),
836            duration.as_secs_f64(),
837            &Self::DURATION_BUCKETS,
838        );
839    }
840
841    pub fn set_worker_queue_depth(&self, queue: &str, depth: u64) {
842        self.set_gauge(
843            "harn_worker_queue_depth",
844            labels([("queue", queue)]),
845            depth as f64,
846        );
847    }
848
849    pub fn record_worker_queue_claim_age(&self, queue: &str, age_seconds: f64) {
850        self.observe_histogram(
851            "harn_worker_queue_claim_age_seconds",
852            labels([("queue", queue)]),
853            age_seconds.max(0.0),
854            &Self::DURATION_BUCKETS,
855        );
856    }
857
858    pub fn set_orchestrator_pump_backlog(&self, topic: &str, count: u64) {
859        self.set_gauge(
860            "harn_orchestrator_pump_backlog",
861            labels([("topic", topic)]),
862            count as f64,
863        );
864    }
865
866    pub fn set_orchestrator_pump_outstanding(&self, topic: &str, count: usize) {
867        self.set_gauge(
868            "harn_orchestrator_pump_outstanding",
869            labels([("topic", topic)]),
870            count as f64,
871        );
872    }
873
874    pub fn record_orchestrator_pump_admission_delay(&self, topic: &str, duration: StdDuration) {
875        self.observe_histogram(
876            "harn_orchestrator_pump_admission_delay_seconds",
877            labels([("topic", topic)]),
878            duration.as_secs_f64(),
879            &Self::DURATION_BUCKETS,
880        );
881    }
882
883    pub fn record_llm_call(&self, provider: &str, model: &str, outcome: &str, cost_usd: f64) {
884        self.increment_counter(
885            "harn_llm_calls_total",
886            labels([
887                ("provider", provider),
888                ("model", model),
889                ("outcome", outcome),
890            ]),
891            1,
892        );
893        if cost_usd > 0.0 {
894            self.increment_counter(
895                "harn_llm_cost_usd_total",
896                labels([("provider", provider), ("model", model)]),
897                cost_usd,
898            );
899        } else {
900            self.ensure_counter(
901                "harn_llm_cost_usd_total",
902                labels([("provider", provider), ("model", model)]),
903            );
904        }
905    }
906
907    pub fn record_llm_cache_hit(&self, provider: &str) {
908        self.increment_counter(
909            "harn_llm_cache_hits_total",
910            labels([("provider", provider)]),
911            1,
912        );
913    }
914
915    pub fn render_prometheus(&self) -> String {
916        let snapshot = self.snapshot();
917        let counters = [
918            (
919                "connector_linear_timestamp_rejections_total",
920                snapshot.linear_timestamp_rejections_total,
921            ),
922            (
923                "dispatch_succeeded_total",
924                snapshot.dispatch_succeeded_total,
925            ),
926            ("dispatch_failed_total", snapshot.dispatch_failed_total),
927            ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
928            ("retry_scheduled_total", snapshot.retry_scheduled_total),
929            (
930                "slack_events_delivery_success_total",
931                snapshot.slack_delivery_success_total,
932            ),
933            (
934                "slack_events_delivery_failure_total",
935                snapshot.slack_delivery_failure_total,
936            ),
937        ];
938
939        let mut rendered = String::new();
940        for (name, value) in counters {
941            rendered.push_str("# TYPE ");
942            rendered.push_str(name);
943            rendered.push_str(" counter\n");
944            rendered.push_str(name);
945            rendered.push(' ');
946            rendered.push_str(&value.to_string());
947            rendered.push('\n');
948        }
949        let custom_counters = self
950            .custom_counters
951            .lock()
952            .expect("custom counters poisoned");
953        for (name, value) in custom_counters.iter() {
954            let metric_name = format!(
955                "connector_custom_{}_total",
956                name.chars()
957                    .map(|ch| if ch.is_ascii_alphanumeric() || ch == '_' {
958                        ch
959                    } else {
960                        '_'
961                    })
962                    .collect::<String>()
963            );
964            rendered.push_str("# TYPE ");
965            rendered.push_str(&metric_name);
966            rendered.push_str(" counter\n");
967            rendered.push_str(&metric_name);
968            rendered.push(' ');
969            rendered.push_str(&value.to_string());
970            rendered.push('\n');
971        }
972        rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
973        rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
974        rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
975        rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
976        self.render_generic_metrics(&mut rendered);
977        rendered
978    }
979
980    fn increment_counter(&self, name: &str, labels: MetricLabels, amount: impl Into<f64>) {
981        let amount = amount.into();
982        if amount <= 0.0 || !amount.is_finite() {
983            return;
984        }
985        let mut counters = self.counters.lock().expect("metrics counters poisoned");
986        *counters.entry((name.to_string(), labels)).or_default() += amount;
987    }
988
989    fn ensure_counter(&self, name: &str, labels: MetricLabels) {
990        let mut counters = self.counters.lock().expect("metrics counters poisoned");
991        counters.entry((name.to_string(), labels)).or_default();
992    }
993
994    fn set_gauge(&self, name: &str, labels: MetricLabels, value: f64) {
995        let mut gauges = self.gauges.lock().expect("metrics gauges poisoned");
996        gauges.insert((name.to_string(), labels), value);
997    }
998
999    fn observe_histogram(
1000        &self,
1001        name: &str,
1002        labels: MetricLabels,
1003        value: f64,
1004        bucket_bounds: &[f64],
1005    ) {
1006        if !value.is_finite() {
1007            return;
1008        }
1009        let mut histograms = self.histograms.lock().expect("metrics histograms poisoned");
1010        let histogram = histograms
1011            .entry((name.to_string(), labels))
1012            .or_insert_with(|| HistogramMetric {
1013                buckets: bucket_bounds
1014                    .iter()
1015                    .map(|bound| (prometheus_float(*bound), 0))
1016                    .chain(std::iter::once(("+Inf".to_string(), 0)))
1017                    .collect(),
1018                count: 0,
1019                sum: 0.0,
1020            });
1021        histogram.count += 1;
1022        histogram.sum += value;
1023        for bound in bucket_bounds {
1024            if value <= *bound {
1025                let key = prometheus_float(*bound);
1026                *histogram.buckets.entry(key).or_default() += 1;
1027            }
1028        }
1029        *histogram.buckets.entry("+Inf".to_string()).or_default() += 1;
1030    }
1031
1032    fn refresh_oldest_pending_gauge(&self, labels: MetricLabels, now_ms: i64) {
1033        let oldest_accepted_at_ms = self
1034            .pending_trigger_events
1035            .lock()
1036            .expect("pending trigger events poisoned")
1037            .get(&labels)
1038            .and_then(|events| events.values().min().copied());
1039        let age_seconds = oldest_accepted_at_ms
1040            .map(|accepted_at_ms| millis_delta(now_ms, accepted_at_ms).as_secs_f64())
1041            .unwrap_or(0.0);
1042        self.set_gauge(
1043            "harn_trigger_oldest_pending_age_seconds",
1044            labels,
1045            age_seconds,
1046        );
1047    }
1048
1049    fn render_generic_metrics(&self, rendered: &mut String) {
1050        let counters = self
1051            .counters
1052            .lock()
1053            .expect("metrics counters poisoned")
1054            .clone();
1055        let gauges = self.gauges.lock().expect("metrics gauges poisoned").clone();
1056        let histograms = self
1057            .histograms
1058            .lock()
1059            .expect("metrics histograms poisoned")
1060            .clone();
1061
1062        for name in metric_family_names(MetricKind::Counter) {
1063            rendered.push_str("# TYPE ");
1064            rendered.push_str(name);
1065            rendered.push_str(" counter\n");
1066            for ((sample_name, labels), value) in counters.iter().filter(|((n, _), _)| n == name) {
1067                render_sample(rendered, sample_name, labels, *value);
1068            }
1069        }
1070        for name in metric_family_names(MetricKind::Gauge) {
1071            rendered.push_str("# TYPE ");
1072            rendered.push_str(name);
1073            rendered.push_str(" gauge\n");
1074            for ((sample_name, labels), value) in gauges.iter().filter(|((n, _), _)| n == name) {
1075                render_sample(rendered, sample_name, labels, *value);
1076            }
1077        }
1078        for name in metric_family_names(MetricKind::Histogram) {
1079            rendered.push_str("# TYPE ");
1080            rendered.push_str(name);
1081            rendered.push_str(" histogram\n");
1082            for ((sample_name, labels), histogram) in
1083                histograms.iter().filter(|((n, _), _)| n == name)
1084            {
1085                for (le, value) in &histogram.buckets {
1086                    let mut bucket_labels = labels.clone();
1087                    bucket_labels.insert("le".to_string(), le.clone());
1088                    render_sample(
1089                        rendered,
1090                        &format!("{sample_name}_bucket"),
1091                        &bucket_labels,
1092                        *value as f64,
1093                    );
1094                }
1095                render_sample(
1096                    rendered,
1097                    &format!("{sample_name}_sum"),
1098                    labels,
1099                    histogram.sum,
1100                );
1101                render_sample(
1102                    rendered,
1103                    &format!("{sample_name}_count"),
1104                    labels,
1105                    histogram.count as f64,
1106                );
1107            }
1108        }
1109    }
1110}
1111
1112#[derive(Clone, Copy)]
1113enum MetricKind {
1114    Counter,
1115    Gauge,
1116    Histogram,
1117}
1118
1119fn metric_family_names(kind: MetricKind) -> &'static [&'static str] {
1120    match kind {
1121        MetricKind::Counter => &[
1122            "harn_http_requests_total",
1123            "harn_trigger_received_total",
1124            "harn_trigger_deduped_total",
1125            "harn_trigger_predicate_evaluations_total",
1126            "harn_trigger_dispatched_total",
1127            "harn_trigger_retries_total",
1128            "harn_trigger_dlq_total",
1129            "harn_trigger_budget_exhausted_total",
1130            "harn_backpressure_events_total",
1131            "harn_a2a_hops_total",
1132            "harn_llm_calls_total",
1133            "harn_llm_cost_usd_total",
1134            "harn_llm_cache_hits_total",
1135        ],
1136        MetricKind::Gauge => &[
1137            "harn_trigger_inflight",
1138            "harn_event_log_topic_size_bytes",
1139            "harn_event_log_consumer_lag",
1140            "harn_trigger_budget_cost_today_usd",
1141            "harn_worker_queue_depth",
1142            "harn_orchestrator_pump_backlog",
1143            "harn_orchestrator_pump_outstanding",
1144            "harn_trigger_oldest_pending_age_seconds",
1145        ],
1146        MetricKind::Histogram => &[
1147            "harn_http_request_duration_seconds",
1148            "harn_http_body_size_bytes",
1149            "harn_trigger_predicate_cost_usd",
1150            "harn_event_log_append_duration_seconds",
1151            "harn_a2a_hop_duration_seconds",
1152            "harn_worker_queue_claim_age_seconds",
1153            "harn_orchestrator_pump_admission_delay_seconds",
1154            "harn_trigger_webhook_accepted_to_normalized_seconds",
1155            "harn_trigger_webhook_accepted_to_queue_append_seconds",
1156            "harn_trigger_queue_age_at_dispatch_admission_seconds",
1157            "harn_trigger_queue_age_at_dispatch_start_seconds",
1158            "harn_trigger_dispatch_runtime_seconds",
1159            "harn_trigger_retry_delay_seconds",
1160            "harn_trigger_accepted_to_dlq_seconds",
1161        ],
1162    }
1163}
1164
1165fn labels<const N: usize>(pairs: [(&str, &str); N]) -> MetricLabels {
1166    pairs
1167        .into_iter()
1168        .map(|(name, value)| (name.to_string(), value.to_string()))
1169        .collect()
1170}
1171
1172fn trigger_lifecycle_labels(
1173    trigger_id: &str,
1174    binding_key: &str,
1175    provider: &str,
1176    tenant_id: Option<&str>,
1177    status: &str,
1178) -> MetricLabels {
1179    labels([
1180        ("binding_key", binding_key),
1181        ("provider", provider),
1182        ("status", status),
1183        ("tenant_id", tenant_label(tenant_id)),
1184        ("trigger_id", trigger_id),
1185    ])
1186}
1187
1188fn trigger_pending_labels(
1189    trigger_id: &str,
1190    binding_key: &str,
1191    provider: &str,
1192    tenant_id: Option<&str>,
1193) -> MetricLabels {
1194    labels([
1195        ("binding_key", binding_key),
1196        ("provider", provider),
1197        ("tenant_id", tenant_label(tenant_id)),
1198        ("trigger_id", trigger_id),
1199    ])
1200}
1201
1202fn tenant_label(tenant_id: Option<&str>) -> &str {
1203    tenant_id
1204        .map(str::trim)
1205        .filter(|value| !value.is_empty())
1206        .unwrap_or("none")
1207}
1208
1209fn millis_delta(later_ms: i64, earlier_ms: i64) -> StdDuration {
1210    StdDuration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
1211}
1212
1213fn render_sample(rendered: &mut String, name: &str, labels: &MetricLabels, value: f64) {
1214    rendered.push_str(name);
1215    if !labels.is_empty() {
1216        rendered.push('{');
1217        for (index, (label, label_value)) in labels.iter().enumerate() {
1218            if index > 0 {
1219                rendered.push(',');
1220            }
1221            rendered.push_str(label);
1222            rendered.push_str("=\"");
1223            rendered.push_str(&escape_label_value(label_value));
1224            rendered.push('"');
1225        }
1226        rendered.push('}');
1227    }
1228    rendered.push(' ');
1229    rendered.push_str(&prometheus_float(value));
1230    rendered.push('\n');
1231}
1232
1233fn escape_label_value(value: &str) -> String {
1234    value
1235        .chars()
1236        .flat_map(|ch| match ch {
1237            '\\' => "\\\\".chars().collect::<Vec<_>>(),
1238            '"' => "\\\"".chars().collect::<Vec<_>>(),
1239            '\n' => "\\n".chars().collect::<Vec<_>>(),
1240            other => vec![other],
1241        })
1242        .collect()
1243}
1244
1245fn prometheus_float(value: f64) -> String {
1246    if value.is_infinite() && value.is_sign_positive() {
1247        return "+Inf".to_string();
1248    }
1249    if value.fract() == 0.0 {
1250        format!("{value:.0}")
1251    } else {
1252        let rendered = format!("{value:.6}");
1253        rendered
1254            .trim_end_matches('0')
1255            .trim_end_matches('.')
1256            .to_string()
1257    }
1258}
1259
1260/// Provider payload schema metadata exposed by a connector.
1261#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1262pub struct ProviderPayloadSchema {
1263    pub harn_schema_name: String,
1264    #[serde(default)]
1265    pub json_schema: JsonValue,
1266}
1267
1268impl ProviderPayloadSchema {
1269    pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
1270        Self {
1271            harn_schema_name: harn_schema_name.into(),
1272            json_schema,
1273        }
1274    }
1275
1276    pub fn named(harn_schema_name: impl Into<String>) -> Self {
1277        Self::new(harn_schema_name, JsonValue::Null)
1278    }
1279}
1280
1281impl Default for ProviderPayloadSchema {
1282    fn default() -> Self {
1283        Self::named("raw")
1284    }
1285}
1286
1287/// High-level transport kind a connector supports.
1288#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1289#[serde(transparent)]
1290pub struct TriggerKind(String);
1291
1292impl TriggerKind {
1293    pub fn new(value: impl Into<String>) -> Self {
1294        Self(value.into())
1295    }
1296
1297    pub fn as_str(&self) -> &str {
1298        self.0.as_str()
1299    }
1300}
1301
1302impl From<&str> for TriggerKind {
1303    fn from(value: &str) -> Self {
1304        Self::new(value)
1305    }
1306}
1307
1308impl From<String> for TriggerKind {
1309    fn from(value: String) -> Self {
1310        Self::new(value)
1311    }
1312}
1313
1314/// Future trigger manifest binding routed to a connector activation.
1315#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1316pub struct TriggerBinding {
1317    pub provider: ProviderId,
1318    pub kind: TriggerKind,
1319    pub binding_id: String,
1320    #[serde(default)]
1321    pub dedupe_key: Option<String>,
1322    #[serde(default = "default_dedupe_retention_days")]
1323    pub dedupe_retention_days: u32,
1324    #[serde(default)]
1325    pub config: JsonValue,
1326}
1327
1328impl TriggerBinding {
1329    pub fn new(
1330        provider: ProviderId,
1331        kind: impl Into<TriggerKind>,
1332        binding_id: impl Into<String>,
1333    ) -> Self {
1334        Self {
1335            provider,
1336            kind: kind.into(),
1337            binding_id: binding_id.into(),
1338            dedupe_key: None,
1339            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1340            config: JsonValue::Null,
1341        }
1342    }
1343}
1344
1345fn default_dedupe_retention_days() -> u32 {
1346    crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
1347}
1348
1349/// Small in-memory trigger-binding registry used to fan bindings into connectors.
1350#[derive(Clone, Debug, Default)]
1351pub struct TriggerRegistry {
1352    bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
1353}
1354
1355impl TriggerRegistry {
1356    pub fn register(&mut self, binding: TriggerBinding) {
1357        self.bindings
1358            .entry(binding.provider.clone())
1359            .or_default()
1360            .push(binding);
1361    }
1362
1363    pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
1364        &self.bindings
1365    }
1366
1367    pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
1368        self.bindings
1369            .get(provider)
1370            .map(Vec::as_slice)
1371            .unwrap_or(&[])
1372    }
1373}
1374
1375/// Metadata returned from connector activation.
1376#[derive(Clone, Debug, PartialEq, Eq)]
1377pub struct ActivationHandle {
1378    pub provider: ProviderId,
1379    pub binding_count: usize,
1380}
1381
1382impl ActivationHandle {
1383    pub fn new(provider: ProviderId, binding_count: usize) -> Self {
1384        Self {
1385            provider,
1386            binding_count,
1387        }
1388    }
1389}
1390
1391/// Provider-native inbound request payload preserved as raw bytes.
1392#[derive(Clone, Debug, PartialEq)]
1393pub struct RawInbound {
1394    pub kind: String,
1395    pub headers: BTreeMap<String, String>,
1396    pub query: BTreeMap<String, String>,
1397    pub body: Vec<u8>,
1398    pub received_at: OffsetDateTime,
1399    pub occurred_at: Option<OffsetDateTime>,
1400    pub tenant_id: Option<TenantId>,
1401    pub metadata: JsonValue,
1402}
1403
1404impl RawInbound {
1405    pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
1406        Self {
1407            kind: kind.into(),
1408            headers,
1409            query: BTreeMap::new(),
1410            body,
1411            received_at: clock::now_utc(),
1412            occurred_at: None,
1413            tenant_id: None,
1414            metadata: JsonValue::Null,
1415        }
1416    }
1417
1418    pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
1419        Ok(serde_json::from_slice(&self.body)?)
1420    }
1421}
1422
1423/// Token-bucket configuration shared across connector clients.
1424#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1425pub struct RateLimitConfig {
1426    pub capacity: u32,
1427    pub refill_tokens: u32,
1428    pub refill_interval: StdDuration,
1429}
1430
1431impl Default for RateLimitConfig {
1432    fn default() -> Self {
1433        Self {
1434            capacity: 60,
1435            refill_tokens: 1,
1436            refill_interval: StdDuration::from_secs(1),
1437        }
1438    }
1439}
1440
1441#[derive(Clone, Debug)]
1442struct TokenBucket {
1443    tokens: f64,
1444    last_refill: ClockInstant,
1445}
1446
1447impl TokenBucket {
1448    fn full(config: RateLimitConfig) -> Self {
1449        Self {
1450            tokens: config.capacity as f64,
1451            last_refill: clock::instant_now(),
1452        }
1453    }
1454
1455    fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
1456        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1457        let rate = config.refill_tokens.max(1) as f64 / interval;
1458        let elapsed = now.duration_since(self.last_refill).as_secs_f64();
1459        self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
1460        self.last_refill = now;
1461    }
1462
1463    fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
1464        self.refill(config, now);
1465        if self.tokens >= 1.0 {
1466            self.tokens -= 1.0;
1467            true
1468        } else {
1469            false
1470        }
1471    }
1472
1473    fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
1474        if self.tokens >= 1.0 {
1475            return StdDuration::ZERO;
1476        }
1477        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1478        let rate = config.refill_tokens.max(1) as f64 / interval;
1479        let missing = (1.0 - self.tokens).max(0.0);
1480        StdDuration::from_secs_f64((missing / rate).max(0.001))
1481    }
1482}
1483
1484/// Shared per-provider, per-key token bucket factory for outbound connector clients.
1485#[derive(Debug)]
1486pub struct RateLimiterFactory {
1487    config: RateLimitConfig,
1488    buckets: Mutex<HashMap<(String, String), TokenBucket>>,
1489}
1490
1491impl RateLimiterFactory {
1492    pub fn new(config: RateLimitConfig) -> Self {
1493        Self {
1494            config,
1495            buckets: Mutex::new(HashMap::new()),
1496        }
1497    }
1498
1499    pub fn config(&self) -> RateLimitConfig {
1500        self.config
1501    }
1502
1503    pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
1504        ScopedRateLimiter {
1505            factory: self,
1506            provider: provider.clone(),
1507            key: key.into(),
1508        }
1509    }
1510
1511    pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
1512        let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1513        let bucket = buckets
1514            .entry((provider.as_str().to_string(), key.to_string()))
1515            .or_insert_with(|| TokenBucket::full(self.config));
1516        bucket.try_acquire(self.config, clock::instant_now())
1517    }
1518
1519    pub async fn acquire(&self, provider: &ProviderId, key: &str) {
1520        loop {
1521            let wait = {
1522                let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1523                let bucket = buckets
1524                    .entry((provider.as_str().to_string(), key.to_string()))
1525                    .or_insert_with(|| TokenBucket::full(self.config));
1526                if bucket.try_acquire(self.config, clock::instant_now()) {
1527                    return;
1528                }
1529                bucket.wait_duration(self.config)
1530            };
1531            tokio::time::sleep(wait).await;
1532        }
1533    }
1534}
1535
1536impl Default for RateLimiterFactory {
1537    fn default() -> Self {
1538        Self::new(RateLimitConfig::default())
1539    }
1540}
1541
1542/// Borrowed view onto a single provider/key rate-limit scope.
1543#[derive(Clone, Debug)]
1544pub struct ScopedRateLimiter<'a> {
1545    factory: &'a RateLimiterFactory,
1546    provider: ProviderId,
1547    key: String,
1548}
1549
1550impl<'a> ScopedRateLimiter<'a> {
1551    pub fn try_acquire(&self) -> bool {
1552        self.factory.try_acquire(&self.provider, &self.key)
1553    }
1554
1555    pub async fn acquire(&self) {
1556        self.factory.acquire(&self.provider, &self.key).await;
1557    }
1558}
1559
1560/// Runtime connector registry keyed by provider id.
1561pub struct ConnectorRegistry {
1562    connectors: BTreeMap<ProviderId, ConnectorHandle>,
1563}
1564
1565impl ConnectorRegistry {
1566    pub fn empty() -> Self {
1567        Self {
1568            connectors: BTreeMap::new(),
1569        }
1570    }
1571
1572    pub fn with_defaults() -> Self {
1573        let mut registry = Self::empty();
1574        for provider in registered_provider_metadata() {
1575            registry
1576                .register(default_connector_for_provider(&provider))
1577                .expect("default connector registration should not fail");
1578        }
1579        registry
1580    }
1581
1582    pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
1583        let provider = connector.provider_id().clone();
1584        if self.connectors.contains_key(&provider) {
1585            return Err(ConnectorError::DuplicateProvider(provider.0));
1586        }
1587        self.connectors
1588            .insert(provider, Arc::new(AsyncMutex::new(connector)));
1589        Ok(())
1590    }
1591
1592    pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
1593        self.connectors.get(id).cloned()
1594    }
1595
1596    pub fn remove(&mut self, id: &ProviderId) -> Option<ConnectorHandle> {
1597        self.connectors.remove(id)
1598    }
1599
1600    pub fn list(&self) -> Vec<ProviderId> {
1601        self.connectors.keys().cloned().collect()
1602    }
1603
1604    pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1605        for connector in self.connectors.values() {
1606            connector.lock().await.init(ctx.clone()).await?;
1607        }
1608        Ok(())
1609    }
1610
1611    pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
1612        let mut clients = BTreeMap::new();
1613        for (provider, connector) in &self.connectors {
1614            let client = connector.lock().await.client();
1615            clients.insert(provider.clone(), client);
1616        }
1617        clients
1618    }
1619
1620    pub async fn activate_all(
1621        &self,
1622        registry: &TriggerRegistry,
1623    ) -> Result<Vec<ActivationHandle>, ConnectorError> {
1624        let mut handles = Vec::new();
1625        for (provider, connector) in &self.connectors {
1626            let bindings = registry.bindings_for(provider);
1627            if bindings.is_empty() {
1628                continue;
1629            }
1630            let connector = connector.lock().await;
1631            handles.push(connector.activate(bindings).await?);
1632        }
1633        Ok(handles)
1634    }
1635}
1636
1637impl Default for ConnectorRegistry {
1638    fn default() -> Self {
1639        Self::with_defaults()
1640    }
1641}
1642
1643fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
1644    // The provider catalog on main registers `github` with
1645    // ProviderRuntimeMetadata::Builtin { connector: "webhook", ... } so that
1646    // before a native connector existed the catalog auto-wired a
1647    // GenericWebhookConnector. Now that #170 lands a first-class
1648    // GitHubConnector (inbound HMAC + GitHub App outbound), we short-circuit
1649    // provider_id "github" here and return the native connector instead of a
1650    // webhook-backed fallback. This keeps manifests that say
1651    // `provider = "github"` pointed at the new connector without requiring
1652    // users to switch to a distinct provider_id.
1653    if provider.provider == "github" {
1654        return Box::new(GitHubConnector::new());
1655    }
1656    if provider.provider == "linear" {
1657        return Box::new(LinearConnector::new());
1658    }
1659    if provider.provider == "slack" {
1660        return Box::new(SlackConnector::new());
1661    }
1662    if provider.provider == "notion" {
1663        return Box::new(NotionConnector::new());
1664    }
1665    if provider.provider == "a2a-push" {
1666        return Box::new(A2aPushConnector::new());
1667    }
1668    match &provider.runtime {
1669        ProviderRuntimeMetadata::Builtin {
1670            connector,
1671            default_signature_variant,
1672        } => match connector.as_str() {
1673            "cron" => Box::new(CronConnector::new()),
1674            "stream" => Box::new(StreamConnector::new(
1675                ProviderId::from(provider.provider.clone()),
1676                provider.schema_name.clone(),
1677            )),
1678            "webhook" => {
1679                let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
1680                    .expect("catalog webhook signature variant must be valid");
1681                Box::new(GenericWebhookConnector::with_profile(
1682                    WebhookProviderProfile::new(
1683                        ProviderId::from(provider.provider.clone()),
1684                        provider.schema_name.clone(),
1685                        variant,
1686                    ),
1687                ))
1688            }
1689            _ => Box::new(PlaceholderConnector::from_metadata(provider)),
1690        },
1691        ProviderRuntimeMetadata::Placeholder => {
1692            Box::new(PlaceholderConnector::from_metadata(provider))
1693        }
1694    }
1695}
1696
1697struct PlaceholderConnector {
1698    provider_id: ProviderId,
1699    kinds: Vec<TriggerKind>,
1700    schema_name: String,
1701}
1702
1703impl PlaceholderConnector {
1704    fn from_metadata(metadata: &ProviderMetadata) -> Self {
1705        Self {
1706            provider_id: ProviderId::from(metadata.provider.clone()),
1707            kinds: metadata
1708                .kinds
1709                .iter()
1710                .cloned()
1711                .map(TriggerKind::from)
1712                .collect(),
1713            schema_name: metadata.schema_name.clone(),
1714        }
1715    }
1716}
1717
1718struct PlaceholderClient;
1719
1720#[async_trait]
1721impl ConnectorClient for PlaceholderClient {
1722    async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1723        Err(ClientError::Other(format!(
1724            "connector client method '{method}' is not implemented for this provider"
1725        )))
1726    }
1727}
1728
1729#[async_trait]
1730impl Connector for PlaceholderConnector {
1731    fn provider_id(&self) -> &ProviderId {
1732        &self.provider_id
1733    }
1734
1735    fn kinds(&self) -> &[TriggerKind] {
1736        &self.kinds
1737    }
1738
1739    async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1740        Ok(())
1741    }
1742
1743    async fn activate(
1744        &self,
1745        bindings: &[TriggerBinding],
1746    ) -> Result<ActivationHandle, ConnectorError> {
1747        Ok(ActivationHandle::new(
1748            self.provider_id.clone(),
1749            bindings.len(),
1750        ))
1751    }
1752
1753    async fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
1754        Err(ConnectorError::Unsupported(format!(
1755            "provider '{}' is cataloged but does not have a concrete inbound connector yet",
1756            self.provider_id.as_str()
1757        )))
1758    }
1759
1760    fn payload_schema(&self) -> ProviderPayloadSchema {
1761        ProviderPayloadSchema::named(self.schema_name.clone())
1762    }
1763
1764    fn client(&self) -> Arc<dyn ConnectorClient> {
1765        Arc::new(PlaceholderClient)
1766    }
1767}
1768
1769pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
1770    ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
1771        *slot.borrow_mut() = clients
1772            .into_iter()
1773            .map(|(provider, client)| (provider.as_str().to_string(), client))
1774            .collect();
1775    });
1776}
1777
1778pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
1779    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
1780}
1781
1782pub fn clear_active_connector_clients() {
1783    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
1784}
1785
1786#[cfg(test)]
1787mod tests {
1788    use super::*;
1789
1790    use std::sync::atomic::{AtomicUsize, Ordering};
1791
1792    use async_trait::async_trait;
1793    use serde_json::json;
1794
1795    struct NoopClient;
1796
1797    #[async_trait]
1798    impl ConnectorClient for NoopClient {
1799        async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1800            Ok(json!({ "method": method }))
1801        }
1802    }
1803
1804    struct FakeConnector {
1805        provider_id: ProviderId,
1806        kinds: Vec<TriggerKind>,
1807        activate_calls: Arc<AtomicUsize>,
1808    }
1809
1810    impl FakeConnector {
1811        fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
1812            Self {
1813                provider_id: ProviderId::from(provider_id),
1814                kinds: vec![TriggerKind::from("webhook")],
1815                activate_calls,
1816            }
1817        }
1818    }
1819
1820    #[async_trait]
1821    impl Connector for FakeConnector {
1822        fn provider_id(&self) -> &ProviderId {
1823            &self.provider_id
1824        }
1825
1826        fn kinds(&self) -> &[TriggerKind] {
1827            &self.kinds
1828        }
1829
1830        async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1831            Ok(())
1832        }
1833
1834        async fn activate(
1835            &self,
1836            bindings: &[TriggerBinding],
1837        ) -> Result<ActivationHandle, ConnectorError> {
1838            self.activate_calls.fetch_add(1, Ordering::SeqCst);
1839            Ok(ActivationHandle::new(
1840                self.provider_id.clone(),
1841                bindings.len(),
1842            ))
1843        }
1844
1845        async fn normalize_inbound(
1846            &self,
1847            _raw: RawInbound,
1848        ) -> Result<TriggerEvent, ConnectorError> {
1849            Err(ConnectorError::Unsupported(
1850                "not needed for registry tests".to_string(),
1851            ))
1852        }
1853
1854        fn payload_schema(&self) -> ProviderPayloadSchema {
1855            ProviderPayloadSchema::named("FakePayload")
1856        }
1857
1858        fn client(&self) -> Arc<dyn ConnectorClient> {
1859            Arc::new(NoopClient)
1860        }
1861    }
1862
1863    #[tokio::test]
1864    async fn connector_registry_rejects_duplicate_providers() {
1865        let activate_calls = Arc::new(AtomicUsize::new(0));
1866        let mut registry = ConnectorRegistry::empty();
1867        registry
1868            .register(Box::new(FakeConnector::new(
1869                "github",
1870                activate_calls.clone(),
1871            )))
1872            .unwrap();
1873
1874        let error = registry
1875            .register(Box::new(FakeConnector::new("github", activate_calls)))
1876            .unwrap_err();
1877        assert!(matches!(
1878            error,
1879            ConnectorError::DuplicateProvider(provider) if provider == "github"
1880        ));
1881    }
1882
1883    #[tokio::test]
1884    async fn connector_registry_activates_only_bound_connectors() {
1885        let github_calls = Arc::new(AtomicUsize::new(0));
1886        let slack_calls = Arc::new(AtomicUsize::new(0));
1887        let mut registry = ConnectorRegistry::empty();
1888        registry
1889            .register(Box::new(FakeConnector::new("github", github_calls.clone())))
1890            .unwrap();
1891        registry
1892            .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
1893            .unwrap();
1894
1895        let mut trigger_registry = TriggerRegistry::default();
1896        trigger_registry.register(TriggerBinding::new(
1897            ProviderId::from("github"),
1898            "webhook",
1899            "github.push",
1900        ));
1901        trigger_registry.register(TriggerBinding::new(
1902            ProviderId::from("github"),
1903            "webhook",
1904            "github.installation",
1905        ));
1906
1907        let handles = registry.activate_all(&trigger_registry).await.unwrap();
1908        assert_eq!(handles.len(), 1);
1909        assert_eq!(handles[0].provider.as_str(), "github");
1910        assert_eq!(handles[0].binding_count, 2);
1911        assert_eq!(github_calls.load(Ordering::SeqCst), 1);
1912        assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
1913    }
1914
1915    #[test]
1916    fn rate_limiter_scopes_tokens_by_provider_and_key() {
1917        let factory = RateLimiterFactory::new(RateLimitConfig {
1918            capacity: 1,
1919            refill_tokens: 1,
1920            refill_interval: StdDuration::from_secs(60),
1921        });
1922
1923        assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
1924        assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
1925        assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
1926        assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
1927    }
1928
1929    #[test]
1930    fn raw_inbound_json_body_preserves_raw_bytes() {
1931        let raw = RawInbound::new(
1932            "push",
1933            BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
1934            br#"{"ok":true}"#.to_vec(),
1935        );
1936
1937        assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
1938    }
1939
1940    #[test]
1941    fn connector_registry_lists_catalog_providers() {
1942        let registry = ConnectorRegistry::default();
1943        let providers = registry.list();
1944        assert!(providers.contains(&ProviderId::from("cron")));
1945        assert!(providers.contains(&ProviderId::from("github")));
1946        assert!(providers.contains(&ProviderId::from("webhook")));
1947    }
1948
1949    #[test]
1950    fn metrics_registry_exports_orchestrator_metric_families() {
1951        let metrics = MetricsRegistry::default();
1952        metrics.record_http_request(
1953            "/triggers/github",
1954            "POST",
1955            200,
1956            StdDuration::from_millis(25),
1957            512,
1958        );
1959        metrics.record_trigger_received("github-new-issue", "github");
1960        metrics.record_trigger_deduped("github-new-issue", "inbox_duplicate");
1961        metrics.record_trigger_predicate_evaluation("github-new-issue", true, 0.002);
1962        metrics.record_trigger_dispatched("github-new-issue", "local", "succeeded");
1963        metrics.record_trigger_retry("github-new-issue", 2);
1964        metrics.record_trigger_dlq("github-new-issue", "retry_exhausted");
1965        metrics.set_trigger_inflight("github-new-issue", 0);
1966        metrics.record_event_log_append(
1967            "orchestrator.triggers.pending",
1968            StdDuration::from_millis(1),
1969            2048,
1970        );
1971        metrics.set_event_log_consumer_lag("orchestrator.triggers.pending", "orchestrator-pump", 0);
1972        metrics.set_trigger_budget_cost_today("github-new-issue", 0.002);
1973        metrics.record_trigger_budget_exhausted("github-new-issue", "daily_budget_exceeded");
1974        metrics.record_a2a_hop("agent.example", "succeeded", StdDuration::from_millis(10));
1975        metrics.set_worker_queue_depth("triage", 1);
1976        metrics.record_worker_queue_claim_age("triage", 3.0);
1977        metrics.set_orchestrator_pump_backlog("trigger.inbox.envelopes", 2);
1978        metrics.set_orchestrator_pump_outstanding("trigger.inbox.envelopes", 1);
1979        metrics.record_orchestrator_pump_admission_delay(
1980            "trigger.inbox.envelopes",
1981            StdDuration::from_millis(50),
1982        );
1983        metrics.record_trigger_accepted_to_normalized(
1984            "github-new-issue",
1985            "github-new-issue@v7",
1986            "github",
1987            Some("tenant-a"),
1988            "normalized",
1989            StdDuration::from_millis(25),
1990        );
1991        metrics.record_trigger_accepted_to_queue_append(
1992            "github-new-issue",
1993            "github-new-issue@v7",
1994            "github",
1995            Some("tenant-a"),
1996            "queued",
1997            StdDuration::from_millis(40),
1998        );
1999        metrics.record_trigger_queue_age_at_dispatch_admission(
2000            "github-new-issue",
2001            "github-new-issue@v7",
2002            "github",
2003            Some("tenant-a"),
2004            "admitted",
2005            StdDuration::from_millis(75),
2006        );
2007        metrics.record_trigger_queue_age_at_dispatch_start(
2008            "github-new-issue",
2009            "github-new-issue@v7",
2010            "github",
2011            Some("tenant-a"),
2012            "started",
2013            StdDuration::from_millis(125),
2014        );
2015        metrics.record_trigger_dispatch_runtime(
2016            "github-new-issue",
2017            "github-new-issue@v7",
2018            "github",
2019            Some("tenant-a"),
2020            "succeeded",
2021            StdDuration::from_millis(250),
2022        );
2023        metrics.record_trigger_retry_delay(
2024            "github-new-issue",
2025            "github-new-issue@v7",
2026            "github",
2027            Some("tenant-a"),
2028            "scheduled",
2029            StdDuration::from_secs(2),
2030        );
2031        metrics.record_trigger_accepted_to_dlq(
2032            "github-new-issue",
2033            "github-new-issue@v7",
2034            "github",
2035            Some("tenant-a"),
2036            "retry_exhausted",
2037            StdDuration::from_secs(45),
2038        );
2039        metrics.record_backpressure_event("ingest", "reject");
2040        metrics.note_trigger_pending_event(
2041            "evt-1",
2042            "github-new-issue",
2043            "github-new-issue@v7",
2044            "github",
2045            Some("tenant-a"),
2046            1_000,
2047            4_000,
2048        );
2049        metrics.record_llm_call("mock", "mock", "succeeded", 0.01);
2050        metrics.record_llm_cache_hit("mock");
2051
2052        let rendered = metrics.render_prometheus();
2053        for needle in [
2054            "harn_http_requests_total{endpoint=\"/triggers/github\",method=\"POST\",status=\"200\"} 1",
2055            "harn_http_request_duration_seconds_bucket{endpoint=\"/triggers/github\",le=\"0.05\"} 1",
2056            "harn_http_body_size_bytes_bucket{endpoint=\"/triggers/github\",le=\"512\"} 1",
2057            "harn_trigger_received_total{provider=\"github\",trigger_id=\"github-new-issue\"} 1",
2058            "harn_trigger_deduped_total{reason=\"inbox_duplicate\",trigger_id=\"github-new-issue\"} 1",
2059            "harn_trigger_predicate_evaluations_total{result=\"true\",trigger_id=\"github-new-issue\"} 1",
2060            "harn_trigger_predicate_cost_usd_bucket{le=\"0.01\",trigger_id=\"github-new-issue\"} 1",
2061            "harn_trigger_dispatched_total{handler_kind=\"local\",outcome=\"succeeded\",trigger_id=\"github-new-issue\"} 1",
2062            "harn_trigger_retries_total{attempt=\"2\",trigger_id=\"github-new-issue\"} 1",
2063            "harn_trigger_dlq_total{reason=\"retry_exhausted\",trigger_id=\"github-new-issue\"} 1",
2064            "harn_trigger_inflight{trigger_id=\"github-new-issue\"} 0",
2065            "harn_event_log_append_duration_seconds_bucket{le=\"0.005\",topic=\"orchestrator.triggers.pending\"} 1",
2066            "harn_event_log_topic_size_bytes{topic=\"orchestrator.triggers.pending\"} 2048",
2067            "harn_event_log_consumer_lag{consumer=\"orchestrator-pump\",topic=\"orchestrator.triggers.pending\"} 0",
2068            "harn_trigger_budget_cost_today_usd{trigger_id=\"github-new-issue\"} 0.002",
2069            "harn_trigger_budget_exhausted_total{strategy=\"daily_budget_exceeded\",trigger_id=\"github-new-issue\"} 1",
2070            "harn_backpressure_events_total{action=\"reject\",dimension=\"ingest\"} 1",
2071            "harn_a2a_hops_total{outcome=\"succeeded\",target=\"agent.example\"} 1",
2072            "harn_a2a_hop_duration_seconds_bucket{le=\"0.01\",target=\"agent.example\"} 1",
2073            "harn_worker_queue_depth{queue=\"triage\"} 1",
2074            "harn_worker_queue_claim_age_seconds_bucket{le=\"5\",queue=\"triage\"} 1",
2075            "harn_orchestrator_pump_backlog{topic=\"trigger.inbox.envelopes\"} 2",
2076            "harn_orchestrator_pump_outstanding{topic=\"trigger.inbox.envelopes\"} 1",
2077            "harn_orchestrator_pump_admission_delay_seconds_bucket{le=\"0.05\",topic=\"trigger.inbox.envelopes\"} 1",
2078            "harn_trigger_webhook_accepted_to_normalized_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.025\",provider=\"github\",status=\"normalized\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2079            "harn_trigger_webhook_accepted_to_queue_append_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.05\",provider=\"github\",status=\"queued\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2080            "harn_trigger_queue_age_at_dispatch_admission_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.1\",provider=\"github\",status=\"admitted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2081            "harn_trigger_queue_age_at_dispatch_start_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"started\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2082            "harn_trigger_dispatch_runtime_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"succeeded\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2083            "harn_trigger_retry_delay_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"2.5\",provider=\"github\",status=\"scheduled\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2084            "harn_trigger_accepted_to_dlq_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"60\",provider=\"github\",status=\"retry_exhausted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2085            "harn_trigger_oldest_pending_age_seconds{binding_key=\"github-new-issue@v7\",provider=\"github\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 3",
2086            "harn_llm_calls_total{model=\"mock\",outcome=\"succeeded\",provider=\"mock\"} 1",
2087            "harn_llm_cost_usd_total{model=\"mock\",provider=\"mock\"} 0.01",
2088            "harn_llm_cache_hits_total{provider=\"mock\"} 1",
2089        ] {
2090            assert!(rendered.contains(needle), "missing {needle}\n{rendered}");
2091        }
2092    }
2093}