Skip to main content

harn_vm/connectors/
mod.rs

1//! Connector traits and shared helpers for inbound event-source providers.
2//!
3//! This lands in `harn-vm` for now because the current dependency surface
4//! (`EventLog`, `SecretProvider`, `TriggerEvent`) already lives here. If the
5//! connector ecosystem grows enough to justify extraction, the module can be
6//! split into a dedicated crate later without changing the high-level contract.
7
8use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex, OnceLock};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25    registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26    ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod a2a_push;
30pub mod cron;
31pub mod github;
32pub mod harn_module;
33pub mod hmac;
34pub mod linear;
35pub mod notion;
36pub mod slack;
37pub mod stream;
38#[cfg(test)]
39pub(crate) mod test_util;
40pub mod testkit;
41pub mod webhook;
42
43pub use a2a_push::A2aPushConnector;
44pub use cron::{CatchupMode, CronConnector};
45pub use github::GitHubConnector;
46pub use harn_module::{
47    load_contract as load_harn_connector_contract, HarnConnector, HarnConnectorContract,
48};
49pub use hmac::{
50    verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
51    DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
52    DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
53    DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
54    DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
55    DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
56    SIGNATURE_VERIFY_AUDIT_TOPIC,
57};
58pub use linear::LinearConnector;
59pub use notion::{
60    load_pending_webhook_handshakes, NotionConnector, PersistedNotionWebhookHandshake,
61};
62pub use slack::SlackConnector;
63pub use stream::StreamConnector;
64use webhook::WebhookProviderProfile;
65pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
66
67/// Shared owned handle to a connector instance registered with the runtime.
68pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
69
70thread_local! {
71    static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
72        RefCell::new(BTreeMap::new());
73}
74
75/// Provider implementation contract for inbound connectors.
76#[async_trait]
77pub trait Connector: Send + Sync {
78    /// Stable provider id such as `github`, `slack`, or `webhook`.
79    fn provider_id(&self) -> &ProviderId;
80
81    /// Trigger kinds this connector supports (`webhook`, `poll`, `stream`, ...).
82    fn kinds(&self) -> &[TriggerKind];
83
84    /// Called once per connector instance at orchestrator startup.
85    async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
86
87    /// Activate the bindings relevant to this connector instance.
88    async fn activate(
89        &self,
90        bindings: &[TriggerBinding],
91    ) -> Result<ActivationHandle, ConnectorError>;
92
93    /// Stop connector-owned background work and flush any connector-local state.
94    async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
95        Ok(())
96    }
97
98    /// Verify + normalize a provider-native inbound request into `TriggerEvent`.
99    async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
100
101    /// Verify + normalize a provider-native inbound request into the richer
102    /// connector result contract used by ack-first webhook adapters.
103    async fn normalize_inbound_result(
104        &self,
105        raw: RawInbound,
106    ) -> Result<ConnectorNormalizeResult, ConnectorError> {
107        self.normalize_inbound(raw)
108            .await
109            .map(ConnectorNormalizeResult::event)
110    }
111
112    /// Payload schema surfaced to future trigger-type narrowing.
113    fn payload_schema(&self) -> ProviderPayloadSchema;
114
115    /// Outbound API wrapper exposed to handlers.
116    fn client(&self) -> Arc<dyn ConnectorClient>;
117}
118
119/// Provider-supplied HTTP response returned before or instead of trigger dispatch.
120#[derive(Clone, Debug, PartialEq)]
121pub struct ConnectorHttpResponse {
122    pub status: u16,
123    pub headers: BTreeMap<String, String>,
124    pub body: JsonValue,
125}
126
127impl ConnectorHttpResponse {
128    pub fn new(status: u16, headers: BTreeMap<String, String>, body: JsonValue) -> Self {
129        Self {
130            status,
131            headers,
132            body,
133        }
134    }
135}
136
137/// Normalized inbound result accepted by the runtime connector adapter.
138#[derive(Clone, Debug, PartialEq)]
139pub enum ConnectorNormalizeResult {
140    Event(Box<TriggerEvent>),
141    Batch(Vec<TriggerEvent>),
142    ImmediateResponse {
143        response: ConnectorHttpResponse,
144        events: Vec<TriggerEvent>,
145    },
146    Reject(ConnectorHttpResponse),
147}
148
149impl ConnectorNormalizeResult {
150    pub fn event(event: TriggerEvent) -> Self {
151        Self::Event(Box::new(event))
152    }
153
154    pub fn into_events(self) -> Vec<TriggerEvent> {
155        match self {
156            Self::Event(event) => vec![*event],
157            Self::Batch(events) | Self::ImmediateResponse { events, .. } => events,
158            Self::Reject(_) => Vec::new(),
159        }
160    }
161}
162
163#[derive(Clone, Debug, PartialEq)]
164pub enum PostNormalizeOutcome {
165    Ready(Box<TriggerEvent>),
166    DuplicateDropped,
167}
168
169pub async fn postprocess_normalized_event(
170    inbox: &InboxIndex,
171    binding_id: &str,
172    dedupe_enabled: bool,
173    dedupe_ttl: StdDuration,
174    mut event: TriggerEvent,
175) -> Result<PostNormalizeOutcome, ConnectorError> {
176    if dedupe_enabled && !event.dedupe_claimed() {
177        if !inbox
178            .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
179            .await?
180        {
181            return Ok(PostNormalizeOutcome::DuplicateDropped);
182        }
183        event.mark_dedupe_claimed();
184    }
185
186    Ok(PostNormalizeOutcome::Ready(Box::new(event)))
187}
188
189/// Outbound provider client interface used by connector-backed stdlib modules.
190#[async_trait]
191pub trait ConnectorClient: Send + Sync {
192    async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
193}
194
195/// Minimal outbound client errors shared by connector implementations.
196#[derive(Clone, Debug, PartialEq, Eq)]
197pub enum ClientError {
198    MethodNotFound(String),
199    InvalidArgs(String),
200    RateLimited(String),
201    Transport(String),
202    Other(String),
203}
204
205impl fmt::Display for ClientError {
206    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
207        match self {
208            Self::MethodNotFound(message)
209            | Self::InvalidArgs(message)
210            | Self::RateLimited(message)
211            | Self::Transport(message)
212            | Self::Other(message) => message.fmt(f),
213        }
214    }
215}
216
217impl std::error::Error for ClientError {}
218
219/// Shared connector-layer errors.
220#[derive(Debug)]
221pub enum ConnectorError {
222    DuplicateProvider(String),
223    DuplicateDelivery(String),
224    UnknownProvider(String),
225    MissingHeader(String),
226    InvalidHeader {
227        name: String,
228        detail: String,
229    },
230    InvalidSignature(String),
231    TimestampOutOfWindow {
232        timestamp: OffsetDateTime,
233        now: OffsetDateTime,
234        window: time::Duration,
235    },
236    Json(String),
237    Secret(String),
238    EventLog(String),
239    HarnRuntime(String),
240    Client(ClientError),
241    Unsupported(String),
242    Activation(String),
243}
244
245impl ConnectorError {
246    pub fn invalid_signature(message: impl Into<String>) -> Self {
247        Self::InvalidSignature(message.into())
248    }
249}
250
251impl fmt::Display for ConnectorError {
252    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
253        match self {
254            Self::DuplicateProvider(provider) => {
255                write!(f, "connector provider `{provider}` is already registered")
256            }
257            Self::DuplicateDelivery(message) => message.fmt(f),
258            Self::UnknownProvider(provider) => {
259                write!(f, "connector provider `{provider}` is not registered")
260            }
261            Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
262            Self::InvalidHeader { name, detail } => {
263                write!(f, "invalid header `{name}`: {detail}")
264            }
265            Self::InvalidSignature(message)
266            | Self::Json(message)
267            | Self::Secret(message)
268            | Self::EventLog(message)
269            | Self::HarnRuntime(message)
270            | Self::Unsupported(message)
271            | Self::Activation(message) => message.fmt(f),
272            Self::TimestampOutOfWindow {
273                timestamp,
274                now,
275                window,
276            } => write!(
277                f,
278                "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
279            ),
280            Self::Client(error) => error.fmt(f),
281        }
282    }
283}
284
285impl std::error::Error for ConnectorError {}
286
287impl From<crate::event_log::LogError> for ConnectorError {
288    fn from(value: crate::event_log::LogError) -> Self {
289        Self::EventLog(value.to_string())
290    }
291}
292
293impl From<crate::secrets::SecretError> for ConnectorError {
294    fn from(value: crate::secrets::SecretError) -> Self {
295        Self::Secret(value.to_string())
296    }
297}
298
299impl From<serde_json::Error> for ConnectorError {
300    fn from(value: serde_json::Error) -> Self {
301        Self::Json(value.to_string())
302    }
303}
304
305impl From<ClientError> for ConnectorError {
306    fn from(value: ClientError) -> Self {
307        Self::Client(value)
308    }
309}
310
311/// Startup context shared with connector instances.
312#[derive(Clone)]
313pub struct ConnectorCtx {
314    pub event_log: Arc<AnyEventLog>,
315    pub secrets: Arc<dyn SecretProvider>,
316    pub inbox: Arc<InboxIndex>,
317    pub metrics: Arc<MetricsRegistry>,
318    pub rate_limiter: Arc<RateLimiterFactory>,
319}
320
321/// Snapshot of connector-local metrics surfaced for tests and diagnostics.
322#[derive(Clone, Debug, Default, PartialEq, Eq)]
323pub struct ConnectorMetricsSnapshot {
324    pub inbox_claims_written: u64,
325    pub inbox_duplicates_rejected: u64,
326    pub inbox_fast_path_hits: u64,
327    pub inbox_durable_hits: u64,
328    pub inbox_expired_entries: u64,
329    pub inbox_active_entries: u64,
330    pub linear_timestamp_rejections_total: u64,
331    pub dispatch_succeeded_total: u64,
332    pub dispatch_failed_total: u64,
333    pub retry_scheduled_total: u64,
334    pub slack_delivery_success_total: u64,
335    pub slack_delivery_failure_total: u64,
336}
337
338type MetricLabels = BTreeMap<String, String>;
339
340#[derive(Clone, Debug, Default, PartialEq)]
341struct HistogramMetric {
342    buckets: BTreeMap<String, u64>,
343    count: u64,
344    sum: f64,
345}
346
347static ACTIVE_METRICS_REGISTRY: OnceLock<Mutex<Option<Arc<MetricsRegistry>>>> = OnceLock::new();
348
349pub fn install_active_metrics_registry(metrics: Arc<MetricsRegistry>) {
350    let slot = ACTIVE_METRICS_REGISTRY.get_or_init(|| Mutex::new(None));
351    *slot.lock().expect("active metrics registry poisoned") = Some(metrics);
352}
353
354pub fn clear_active_metrics_registry() {
355    if let Some(slot) = ACTIVE_METRICS_REGISTRY.get() {
356        *slot.lock().expect("active metrics registry poisoned") = None;
357    }
358}
359
360pub fn active_metrics_registry() -> Option<Arc<MetricsRegistry>> {
361    ACTIVE_METRICS_REGISTRY.get().and_then(|slot| {
362        slot.lock()
363            .expect("active metrics registry poisoned")
364            .clone()
365    })
366}
367
368/// Shared metrics surface for connector-local counters and timings.
369#[derive(Debug, Default)]
370pub struct MetricsRegistry {
371    inbox_claims_written: AtomicU64,
372    inbox_duplicates_rejected: AtomicU64,
373    inbox_fast_path_hits: AtomicU64,
374    inbox_durable_hits: AtomicU64,
375    inbox_expired_entries: AtomicU64,
376    inbox_active_entries: AtomicU64,
377    linear_timestamp_rejections_total: AtomicU64,
378    dispatch_succeeded_total: AtomicU64,
379    dispatch_failed_total: AtomicU64,
380    retry_scheduled_total: AtomicU64,
381    slack_delivery_success_total: AtomicU64,
382    slack_delivery_failure_total: AtomicU64,
383    custom_counters: Mutex<BTreeMap<String, u64>>,
384    counters: Mutex<BTreeMap<(String, MetricLabels), f64>>,
385    gauges: Mutex<BTreeMap<(String, MetricLabels), f64>>,
386    histograms: Mutex<BTreeMap<(String, MetricLabels), HistogramMetric>>,
387    pending_trigger_events: Mutex<BTreeMap<MetricLabels, BTreeMap<String, i64>>>,
388}
389
390impl MetricsRegistry {
391    const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0];
392    const TRIGGER_LATENCY_BUCKETS: [f64; 15] = [
393        0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0,
394    ];
395    const SIZE_BUCKETS: [f64; 9] = [
396        128.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0, 10485760.0,
397    ];
398
399    pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
400        ConnectorMetricsSnapshot {
401            inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
402            inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
403            inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
404            inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
405            inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
406            inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
407            linear_timestamp_rejections_total: self
408                .linear_timestamp_rejections_total
409                .load(Ordering::Relaxed),
410            dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
411            dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
412            retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
413            slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
414            slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
415        }
416    }
417
418    pub(crate) fn record_inbox_claim(&self) {
419        self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
420    }
421
422    pub(crate) fn record_inbox_duplicate_fast_path(&self) {
423        self.inbox_duplicates_rejected
424            .fetch_add(1, Ordering::Relaxed);
425        self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
426    }
427
428    pub(crate) fn record_inbox_duplicate_durable(&self) {
429        self.inbox_duplicates_rejected
430            .fetch_add(1, Ordering::Relaxed);
431        self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
432    }
433
434    pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
435        if count > 0 {
436            self.inbox_expired_entries
437                .fetch_add(count, Ordering::Relaxed);
438        }
439    }
440
441    pub(crate) fn set_inbox_active_entries(&self, count: usize) {
442        self.inbox_active_entries
443            .store(count as u64, Ordering::Relaxed);
444    }
445
446    pub fn record_linear_timestamp_rejection(&self) {
447        self.linear_timestamp_rejections_total
448            .fetch_add(1, Ordering::Relaxed);
449    }
450
451    pub fn record_dispatch_succeeded(&self) {
452        self.dispatch_succeeded_total
453            .fetch_add(1, Ordering::Relaxed);
454    }
455
456    pub fn record_dispatch_failed(&self) {
457        self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
458    }
459
460    pub fn record_retry_scheduled(&self) {
461        self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
462    }
463
464    pub fn record_slack_delivery_success(&self) {
465        self.slack_delivery_success_total
466            .fetch_add(1, Ordering::Relaxed);
467    }
468
469    pub fn record_slack_delivery_failure(&self) {
470        self.slack_delivery_failure_total
471            .fetch_add(1, Ordering::Relaxed);
472    }
473
474    pub fn record_custom_counter(&self, name: &str, amount: u64) {
475        if amount == 0 {
476            return;
477        }
478        let mut counters = self
479            .custom_counters
480            .lock()
481            .expect("custom counters poisoned");
482        *counters.entry(name.to_string()).or_default() += amount;
483    }
484
485    pub fn record_http_request(
486        &self,
487        endpoint: &str,
488        method: &str,
489        status: u16,
490        duration: StdDuration,
491        body_size_bytes: usize,
492    ) {
493        self.increment_counter(
494            "harn_http_requests_total",
495            labels([
496                ("endpoint", endpoint),
497                ("method", method),
498                ("status", &status.to_string()),
499            ]),
500            1,
501        );
502        self.observe_histogram(
503            "harn_http_request_duration_seconds",
504            labels([("endpoint", endpoint)]),
505            duration.as_secs_f64(),
506            &Self::DURATION_BUCKETS,
507        );
508        self.observe_histogram(
509            "harn_http_body_size_bytes",
510            labels([("endpoint", endpoint)]),
511            body_size_bytes as f64,
512            &Self::SIZE_BUCKETS,
513        );
514    }
515
516    pub fn record_trigger_received(&self, trigger_id: &str, provider: &str) {
517        self.increment_counter(
518            "harn_trigger_received_total",
519            labels([("trigger_id", trigger_id), ("provider", provider)]),
520            1,
521        );
522    }
523
524    pub fn record_trigger_deduped(&self, trigger_id: &str, reason: &str) {
525        self.increment_counter(
526            "harn_trigger_deduped_total",
527            labels([("trigger_id", trigger_id), ("reason", reason)]),
528            1,
529        );
530    }
531
532    pub fn record_trigger_predicate_evaluation(
533        &self,
534        trigger_id: &str,
535        result: bool,
536        cost_usd: f64,
537    ) {
538        self.increment_counter(
539            "harn_trigger_predicate_evaluations_total",
540            labels([
541                ("trigger_id", trigger_id),
542                ("result", if result { "true" } else { "false" }),
543            ]),
544            1,
545        );
546        self.observe_histogram(
547            "harn_trigger_predicate_cost_usd",
548            labels([("trigger_id", trigger_id)]),
549            cost_usd.max(0.0),
550            &[0.0, 0.001, 0.01, 0.05, 0.1, 1.0],
551        );
552    }
553
554    pub fn record_trigger_dispatched(&self, trigger_id: &str, handler_kind: &str, outcome: &str) {
555        self.increment_counter(
556            "harn_trigger_dispatched_total",
557            labels([
558                ("trigger_id", trigger_id),
559                ("handler_kind", handler_kind),
560                ("outcome", outcome),
561            ]),
562            1,
563        );
564    }
565
566    pub fn record_trigger_retry(&self, trigger_id: &str, attempt: u32) {
567        self.increment_counter(
568            "harn_trigger_retries_total",
569            labels([
570                ("trigger_id", trigger_id),
571                ("attempt", &attempt.to_string()),
572            ]),
573            1,
574        );
575    }
576
577    pub fn record_trigger_dlq(&self, trigger_id: &str, reason: &str) {
578        self.increment_counter(
579            "harn_trigger_dlq_total",
580            labels([("trigger_id", trigger_id), ("reason", reason)]),
581            1,
582        );
583    }
584
585    pub fn record_trigger_accepted_to_normalized(
586        &self,
587        trigger_id: &str,
588        binding_key: &str,
589        provider: &str,
590        tenant_id: Option<&str>,
591        status: &str,
592        duration: StdDuration,
593    ) {
594        self.observe_histogram(
595            "harn_trigger_webhook_accepted_to_normalized_seconds",
596            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
597            duration.as_secs_f64(),
598            &Self::TRIGGER_LATENCY_BUCKETS,
599        );
600    }
601
602    pub fn record_trigger_accepted_to_queue_append(
603        &self,
604        trigger_id: &str,
605        binding_key: &str,
606        provider: &str,
607        tenant_id: Option<&str>,
608        status: &str,
609        duration: StdDuration,
610    ) {
611        self.observe_histogram(
612            "harn_trigger_webhook_accepted_to_queue_append_seconds",
613            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
614            duration.as_secs_f64(),
615            &Self::TRIGGER_LATENCY_BUCKETS,
616        );
617    }
618
619    pub fn record_trigger_queue_age_at_dispatch_admission(
620        &self,
621        trigger_id: &str,
622        binding_key: &str,
623        provider: &str,
624        tenant_id: Option<&str>,
625        status: &str,
626        age: StdDuration,
627    ) {
628        self.observe_histogram(
629            "harn_trigger_queue_age_at_dispatch_admission_seconds",
630            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
631            age.as_secs_f64(),
632            &Self::TRIGGER_LATENCY_BUCKETS,
633        );
634    }
635
636    pub fn record_trigger_queue_age_at_dispatch_start(
637        &self,
638        trigger_id: &str,
639        binding_key: &str,
640        provider: &str,
641        tenant_id: Option<&str>,
642        status: &str,
643        age: StdDuration,
644    ) {
645        self.observe_histogram(
646            "harn_trigger_queue_age_at_dispatch_start_seconds",
647            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
648            age.as_secs_f64(),
649            &Self::TRIGGER_LATENCY_BUCKETS,
650        );
651    }
652
653    pub fn record_trigger_dispatch_runtime(
654        &self,
655        trigger_id: &str,
656        binding_key: &str,
657        provider: &str,
658        tenant_id: Option<&str>,
659        status: &str,
660        duration: StdDuration,
661    ) {
662        self.observe_histogram(
663            "harn_trigger_dispatch_runtime_seconds",
664            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
665            duration.as_secs_f64(),
666            &Self::TRIGGER_LATENCY_BUCKETS,
667        );
668    }
669
670    pub fn record_trigger_retry_delay(
671        &self,
672        trigger_id: &str,
673        binding_key: &str,
674        provider: &str,
675        tenant_id: Option<&str>,
676        status: &str,
677        duration: StdDuration,
678    ) {
679        self.observe_histogram(
680            "harn_trigger_retry_delay_seconds",
681            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
682            duration.as_secs_f64(),
683            &Self::TRIGGER_LATENCY_BUCKETS,
684        );
685    }
686
687    pub fn record_trigger_accepted_to_dlq(
688        &self,
689        trigger_id: &str,
690        binding_key: &str,
691        provider: &str,
692        tenant_id: Option<&str>,
693        status: &str,
694        duration: StdDuration,
695    ) {
696        self.observe_histogram(
697            "harn_trigger_accepted_to_dlq_seconds",
698            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
699            duration.as_secs_f64(),
700            &Self::TRIGGER_LATENCY_BUCKETS,
701        );
702    }
703
704    pub fn note_trigger_pending_event(
705        &self,
706        event_id: &str,
707        trigger_id: &str,
708        binding_key: &str,
709        provider: &str,
710        tenant_id: Option<&str>,
711        accepted_at_ms: i64,
712        now_ms: i64,
713    ) {
714        let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
715        {
716            let mut pending = self
717                .pending_trigger_events
718                .lock()
719                .expect("pending trigger events poisoned");
720            pending
721                .entry(labels.clone())
722                .or_default()
723                .insert(event_id.to_string(), accepted_at_ms);
724        }
725        self.refresh_oldest_pending_gauge(labels, now_ms);
726    }
727
728    pub fn clear_trigger_pending_event(
729        &self,
730        event_id: &str,
731        trigger_id: &str,
732        binding_key: &str,
733        provider: &str,
734        tenant_id: Option<&str>,
735        now_ms: i64,
736    ) {
737        let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
738        {
739            let mut pending = self
740                .pending_trigger_events
741                .lock()
742                .expect("pending trigger events poisoned");
743            if let Some(events) = pending.get_mut(&labels) {
744                events.remove(event_id);
745                if events.is_empty() {
746                    pending.remove(&labels);
747                }
748            }
749        }
750        self.refresh_oldest_pending_gauge(labels, now_ms);
751    }
752
753    pub fn set_trigger_inflight(&self, trigger_id: &str, count: u64) {
754        self.set_gauge(
755            "harn_trigger_inflight",
756            labels([("trigger_id", trigger_id)]),
757            count as f64,
758        );
759    }
760
761    pub fn set_trigger_budget_cost_today(&self, trigger_id: &str, cost_usd: f64) {
762        self.set_gauge(
763            "harn_trigger_budget_cost_today_usd",
764            labels([("trigger_id", trigger_id)]),
765            cost_usd.max(0.0),
766        );
767    }
768
769    pub fn record_trigger_budget_exhausted(&self, trigger_id: &str, strategy: &str) {
770        self.increment_counter(
771            "harn_trigger_budget_exhausted_total",
772            labels([("trigger_id", trigger_id), ("strategy", strategy)]),
773            1,
774        );
775    }
776
777    pub fn record_backpressure_event(&self, dimension: &str, action: &str) {
778        self.increment_counter(
779            "harn_backpressure_events_total",
780            labels([("dimension", dimension), ("action", action)]),
781            1,
782        );
783    }
784
785    pub fn record_event_log_append(
786        &self,
787        topic: &str,
788        duration: StdDuration,
789        payload_bytes: usize,
790    ) {
791        self.observe_histogram(
792            "harn_event_log_append_duration_seconds",
793            labels([("topic", topic)]),
794            duration.as_secs_f64(),
795            &Self::DURATION_BUCKETS,
796        );
797        self.set_gauge(
798            "harn_event_log_topic_size_bytes",
799            labels([("topic", topic)]),
800            payload_bytes as f64,
801        );
802    }
803
804    pub fn set_event_log_consumer_lag(&self, topic: &str, consumer: &str, lag: u64) {
805        self.set_gauge(
806            "harn_event_log_consumer_lag",
807            labels([("topic", topic), ("consumer", consumer)]),
808            lag as f64,
809        );
810    }
811
812    pub fn record_a2a_hop(&self, target: &str, outcome: &str, duration: StdDuration) {
813        self.increment_counter(
814            "harn_a2a_hops_total",
815            labels([("target", target), ("outcome", outcome)]),
816            1,
817        );
818        self.observe_histogram(
819            "harn_a2a_hop_duration_seconds",
820            labels([("target", target)]),
821            duration.as_secs_f64(),
822            &Self::DURATION_BUCKETS,
823        );
824    }
825
826    pub fn set_worker_queue_depth(&self, queue: &str, depth: u64) {
827        self.set_gauge(
828            "harn_worker_queue_depth",
829            labels([("queue", queue)]),
830            depth as f64,
831        );
832    }
833
834    pub fn record_worker_queue_claim_age(&self, queue: &str, age_seconds: f64) {
835        self.observe_histogram(
836            "harn_worker_queue_claim_age_seconds",
837            labels([("queue", queue)]),
838            age_seconds.max(0.0),
839            &Self::DURATION_BUCKETS,
840        );
841    }
842
843    pub fn set_orchestrator_pump_backlog(&self, topic: &str, count: u64) {
844        self.set_gauge(
845            "harn_orchestrator_pump_backlog",
846            labels([("topic", topic)]),
847            count as f64,
848        );
849    }
850
851    pub fn set_orchestrator_pump_outstanding(&self, topic: &str, count: usize) {
852        self.set_gauge(
853            "harn_orchestrator_pump_outstanding",
854            labels([("topic", topic)]),
855            count as f64,
856        );
857    }
858
859    pub fn record_orchestrator_pump_admission_delay(&self, topic: &str, duration: StdDuration) {
860        self.observe_histogram(
861            "harn_orchestrator_pump_admission_delay_seconds",
862            labels([("topic", topic)]),
863            duration.as_secs_f64(),
864            &Self::DURATION_BUCKETS,
865        );
866    }
867
868    pub fn record_llm_call(&self, provider: &str, model: &str, outcome: &str, cost_usd: f64) {
869        self.increment_counter(
870            "harn_llm_calls_total",
871            labels([
872                ("provider", provider),
873                ("model", model),
874                ("outcome", outcome),
875            ]),
876            1,
877        );
878        if cost_usd > 0.0 {
879            self.increment_counter(
880                "harn_llm_cost_usd_total",
881                labels([("provider", provider), ("model", model)]),
882                cost_usd,
883            );
884        } else {
885            self.ensure_counter(
886                "harn_llm_cost_usd_total",
887                labels([("provider", provider), ("model", model)]),
888            );
889        }
890    }
891
892    pub fn record_llm_cache_hit(&self, provider: &str) {
893        self.increment_counter(
894            "harn_llm_cache_hits_total",
895            labels([("provider", provider)]),
896            1,
897        );
898    }
899
900    pub fn render_prometheus(&self) -> String {
901        let snapshot = self.snapshot();
902        let counters = [
903            (
904                "connector_linear_timestamp_rejections_total",
905                snapshot.linear_timestamp_rejections_total,
906            ),
907            (
908                "dispatch_succeeded_total",
909                snapshot.dispatch_succeeded_total,
910            ),
911            ("dispatch_failed_total", snapshot.dispatch_failed_total),
912            ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
913            ("retry_scheduled_total", snapshot.retry_scheduled_total),
914            (
915                "slack_events_delivery_success_total",
916                snapshot.slack_delivery_success_total,
917            ),
918            (
919                "slack_events_delivery_failure_total",
920                snapshot.slack_delivery_failure_total,
921            ),
922        ];
923
924        let mut rendered = String::new();
925        for (name, value) in counters {
926            rendered.push_str("# TYPE ");
927            rendered.push_str(name);
928            rendered.push_str(" counter\n");
929            rendered.push_str(name);
930            rendered.push(' ');
931            rendered.push_str(&value.to_string());
932            rendered.push('\n');
933        }
934        let custom_counters = self
935            .custom_counters
936            .lock()
937            .expect("custom counters poisoned");
938        for (name, value) in custom_counters.iter() {
939            let metric_name = format!(
940                "connector_custom_{}_total",
941                name.chars()
942                    .map(|ch| if ch.is_ascii_alphanumeric() || ch == '_' {
943                        ch
944                    } else {
945                        '_'
946                    })
947                    .collect::<String>()
948            );
949            rendered.push_str("# TYPE ");
950            rendered.push_str(&metric_name);
951            rendered.push_str(" counter\n");
952            rendered.push_str(&metric_name);
953            rendered.push(' ');
954            rendered.push_str(&value.to_string());
955            rendered.push('\n');
956        }
957        rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
958        rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
959        rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
960        rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
961        self.render_generic_metrics(&mut rendered);
962        rendered
963    }
964
965    fn increment_counter(&self, name: &str, labels: MetricLabels, amount: impl Into<f64>) {
966        let amount = amount.into();
967        if amount <= 0.0 || !amount.is_finite() {
968            return;
969        }
970        let mut counters = self.counters.lock().expect("metrics counters poisoned");
971        *counters.entry((name.to_string(), labels)).or_default() += amount;
972    }
973
974    fn ensure_counter(&self, name: &str, labels: MetricLabels) {
975        let mut counters = self.counters.lock().expect("metrics counters poisoned");
976        counters.entry((name.to_string(), labels)).or_default();
977    }
978
979    fn set_gauge(&self, name: &str, labels: MetricLabels, value: f64) {
980        let mut gauges = self.gauges.lock().expect("metrics gauges poisoned");
981        gauges.insert((name.to_string(), labels), value);
982    }
983
984    fn observe_histogram(
985        &self,
986        name: &str,
987        labels: MetricLabels,
988        value: f64,
989        bucket_bounds: &[f64],
990    ) {
991        if !value.is_finite() {
992            return;
993        }
994        let mut histograms = self.histograms.lock().expect("metrics histograms poisoned");
995        let histogram = histograms
996            .entry((name.to_string(), labels))
997            .or_insert_with(|| HistogramMetric {
998                buckets: bucket_bounds
999                    .iter()
1000                    .map(|bound| (prometheus_float(*bound), 0))
1001                    .chain(std::iter::once(("+Inf".to_string(), 0)))
1002                    .collect(),
1003                count: 0,
1004                sum: 0.0,
1005            });
1006        histogram.count += 1;
1007        histogram.sum += value;
1008        for bound in bucket_bounds {
1009            if value <= *bound {
1010                let key = prometheus_float(*bound);
1011                *histogram.buckets.entry(key).or_default() += 1;
1012            }
1013        }
1014        *histogram.buckets.entry("+Inf".to_string()).or_default() += 1;
1015    }
1016
1017    fn refresh_oldest_pending_gauge(&self, labels: MetricLabels, now_ms: i64) {
1018        let oldest_accepted_at_ms = self
1019            .pending_trigger_events
1020            .lock()
1021            .expect("pending trigger events poisoned")
1022            .get(&labels)
1023            .and_then(|events| events.values().min().copied());
1024        let age_seconds = oldest_accepted_at_ms
1025            .map(|accepted_at_ms| millis_delta(now_ms, accepted_at_ms).as_secs_f64())
1026            .unwrap_or(0.0);
1027        self.set_gauge(
1028            "harn_trigger_oldest_pending_age_seconds",
1029            labels,
1030            age_seconds,
1031        );
1032    }
1033
1034    fn render_generic_metrics(&self, rendered: &mut String) {
1035        let counters = self
1036            .counters
1037            .lock()
1038            .expect("metrics counters poisoned")
1039            .clone();
1040        let gauges = self.gauges.lock().expect("metrics gauges poisoned").clone();
1041        let histograms = self
1042            .histograms
1043            .lock()
1044            .expect("metrics histograms poisoned")
1045            .clone();
1046
1047        for name in metric_family_names(MetricKind::Counter) {
1048            rendered.push_str("# TYPE ");
1049            rendered.push_str(name);
1050            rendered.push_str(" counter\n");
1051            for ((sample_name, labels), value) in counters.iter().filter(|((n, _), _)| n == name) {
1052                render_sample(rendered, sample_name, labels, *value);
1053            }
1054        }
1055        for name in metric_family_names(MetricKind::Gauge) {
1056            rendered.push_str("# TYPE ");
1057            rendered.push_str(name);
1058            rendered.push_str(" gauge\n");
1059            for ((sample_name, labels), value) in gauges.iter().filter(|((n, _), _)| n == name) {
1060                render_sample(rendered, sample_name, labels, *value);
1061            }
1062        }
1063        for name in metric_family_names(MetricKind::Histogram) {
1064            rendered.push_str("# TYPE ");
1065            rendered.push_str(name);
1066            rendered.push_str(" histogram\n");
1067            for ((sample_name, labels), histogram) in
1068                histograms.iter().filter(|((n, _), _)| n == name)
1069            {
1070                for (le, value) in &histogram.buckets {
1071                    let mut bucket_labels = labels.clone();
1072                    bucket_labels.insert("le".to_string(), le.clone());
1073                    render_sample(
1074                        rendered,
1075                        &format!("{sample_name}_bucket"),
1076                        &bucket_labels,
1077                        *value as f64,
1078                    );
1079                }
1080                render_sample(
1081                    rendered,
1082                    &format!("{sample_name}_sum"),
1083                    labels,
1084                    histogram.sum,
1085                );
1086                render_sample(
1087                    rendered,
1088                    &format!("{sample_name}_count"),
1089                    labels,
1090                    histogram.count as f64,
1091                );
1092            }
1093        }
1094    }
1095}
1096
1097#[derive(Clone, Copy)]
1098enum MetricKind {
1099    Counter,
1100    Gauge,
1101    Histogram,
1102}
1103
1104fn metric_family_names(kind: MetricKind) -> &'static [&'static str] {
1105    match kind {
1106        MetricKind::Counter => &[
1107            "harn_http_requests_total",
1108            "harn_trigger_received_total",
1109            "harn_trigger_deduped_total",
1110            "harn_trigger_predicate_evaluations_total",
1111            "harn_trigger_dispatched_total",
1112            "harn_trigger_retries_total",
1113            "harn_trigger_dlq_total",
1114            "harn_trigger_budget_exhausted_total",
1115            "harn_backpressure_events_total",
1116            "harn_a2a_hops_total",
1117            "harn_llm_calls_total",
1118            "harn_llm_cost_usd_total",
1119            "harn_llm_cache_hits_total",
1120        ],
1121        MetricKind::Gauge => &[
1122            "harn_trigger_inflight",
1123            "harn_event_log_topic_size_bytes",
1124            "harn_event_log_consumer_lag",
1125            "harn_trigger_budget_cost_today_usd",
1126            "harn_worker_queue_depth",
1127            "harn_orchestrator_pump_backlog",
1128            "harn_orchestrator_pump_outstanding",
1129            "harn_trigger_oldest_pending_age_seconds",
1130        ],
1131        MetricKind::Histogram => &[
1132            "harn_http_request_duration_seconds",
1133            "harn_http_body_size_bytes",
1134            "harn_trigger_predicate_cost_usd",
1135            "harn_event_log_append_duration_seconds",
1136            "harn_a2a_hop_duration_seconds",
1137            "harn_worker_queue_claim_age_seconds",
1138            "harn_orchestrator_pump_admission_delay_seconds",
1139            "harn_trigger_webhook_accepted_to_normalized_seconds",
1140            "harn_trigger_webhook_accepted_to_queue_append_seconds",
1141            "harn_trigger_queue_age_at_dispatch_admission_seconds",
1142            "harn_trigger_queue_age_at_dispatch_start_seconds",
1143            "harn_trigger_dispatch_runtime_seconds",
1144            "harn_trigger_retry_delay_seconds",
1145            "harn_trigger_accepted_to_dlq_seconds",
1146        ],
1147    }
1148}
1149
1150fn labels<const N: usize>(pairs: [(&str, &str); N]) -> MetricLabels {
1151    pairs
1152        .into_iter()
1153        .map(|(name, value)| (name.to_string(), value.to_string()))
1154        .collect()
1155}
1156
1157fn trigger_lifecycle_labels(
1158    trigger_id: &str,
1159    binding_key: &str,
1160    provider: &str,
1161    tenant_id: Option<&str>,
1162    status: &str,
1163) -> MetricLabels {
1164    labels([
1165        ("binding_key", binding_key),
1166        ("provider", provider),
1167        ("status", status),
1168        ("tenant_id", tenant_label(tenant_id)),
1169        ("trigger_id", trigger_id),
1170    ])
1171}
1172
1173fn trigger_pending_labels(
1174    trigger_id: &str,
1175    binding_key: &str,
1176    provider: &str,
1177    tenant_id: Option<&str>,
1178) -> MetricLabels {
1179    labels([
1180        ("binding_key", binding_key),
1181        ("provider", provider),
1182        ("tenant_id", tenant_label(tenant_id)),
1183        ("trigger_id", trigger_id),
1184    ])
1185}
1186
1187fn tenant_label(tenant_id: Option<&str>) -> &str {
1188    tenant_id
1189        .map(str::trim)
1190        .filter(|value| !value.is_empty())
1191        .unwrap_or("none")
1192}
1193
1194fn millis_delta(later_ms: i64, earlier_ms: i64) -> StdDuration {
1195    StdDuration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
1196}
1197
1198fn render_sample(rendered: &mut String, name: &str, labels: &MetricLabels, value: f64) {
1199    rendered.push_str(name);
1200    if !labels.is_empty() {
1201        rendered.push('{');
1202        for (index, (label, label_value)) in labels.iter().enumerate() {
1203            if index > 0 {
1204                rendered.push(',');
1205            }
1206            rendered.push_str(label);
1207            rendered.push_str("=\"");
1208            rendered.push_str(&escape_label_value(label_value));
1209            rendered.push('"');
1210        }
1211        rendered.push('}');
1212    }
1213    rendered.push(' ');
1214    rendered.push_str(&prometheus_float(value));
1215    rendered.push('\n');
1216}
1217
1218fn escape_label_value(value: &str) -> String {
1219    value
1220        .chars()
1221        .flat_map(|ch| match ch {
1222            '\\' => "\\\\".chars().collect::<Vec<_>>(),
1223            '"' => "\\\"".chars().collect::<Vec<_>>(),
1224            '\n' => "\\n".chars().collect::<Vec<_>>(),
1225            other => vec![other],
1226        })
1227        .collect()
1228}
1229
1230fn prometheus_float(value: f64) -> String {
1231    if value.is_infinite() && value.is_sign_positive() {
1232        return "+Inf".to_string();
1233    }
1234    if value.fract() == 0.0 {
1235        format!("{value:.0}")
1236    } else {
1237        let rendered = format!("{value:.6}");
1238        rendered
1239            .trim_end_matches('0')
1240            .trim_end_matches('.')
1241            .to_string()
1242    }
1243}
1244
1245/// Provider payload schema metadata exposed by a connector.
1246#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1247pub struct ProviderPayloadSchema {
1248    pub harn_schema_name: String,
1249    #[serde(default)]
1250    pub json_schema: JsonValue,
1251}
1252
1253impl ProviderPayloadSchema {
1254    pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
1255        Self {
1256            harn_schema_name: harn_schema_name.into(),
1257            json_schema,
1258        }
1259    }
1260
1261    pub fn named(harn_schema_name: impl Into<String>) -> Self {
1262        Self::new(harn_schema_name, JsonValue::Null)
1263    }
1264}
1265
1266impl Default for ProviderPayloadSchema {
1267    fn default() -> Self {
1268        Self::named("raw")
1269    }
1270}
1271
1272/// High-level transport kind a connector supports.
1273#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1274#[serde(transparent)]
1275pub struct TriggerKind(String);
1276
1277impl TriggerKind {
1278    pub fn new(value: impl Into<String>) -> Self {
1279        Self(value.into())
1280    }
1281
1282    pub fn as_str(&self) -> &str {
1283        self.0.as_str()
1284    }
1285}
1286
1287impl From<&str> for TriggerKind {
1288    fn from(value: &str) -> Self {
1289        Self::new(value)
1290    }
1291}
1292
1293impl From<String> for TriggerKind {
1294    fn from(value: String) -> Self {
1295        Self::new(value)
1296    }
1297}
1298
1299/// Future trigger manifest binding routed to a connector activation.
1300#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1301pub struct TriggerBinding {
1302    pub provider: ProviderId,
1303    pub kind: TriggerKind,
1304    pub binding_id: String,
1305    #[serde(default)]
1306    pub dedupe_key: Option<String>,
1307    #[serde(default = "default_dedupe_retention_days")]
1308    pub dedupe_retention_days: u32,
1309    #[serde(default)]
1310    pub config: JsonValue,
1311}
1312
1313impl TriggerBinding {
1314    pub fn new(
1315        provider: ProviderId,
1316        kind: impl Into<TriggerKind>,
1317        binding_id: impl Into<String>,
1318    ) -> Self {
1319        Self {
1320            provider,
1321            kind: kind.into(),
1322            binding_id: binding_id.into(),
1323            dedupe_key: None,
1324            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1325            config: JsonValue::Null,
1326        }
1327    }
1328}
1329
1330fn default_dedupe_retention_days() -> u32 {
1331    crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
1332}
1333
1334/// Small in-memory trigger-binding registry used to fan bindings into connectors.
1335#[derive(Clone, Debug, Default)]
1336pub struct TriggerRegistry {
1337    bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
1338}
1339
1340impl TriggerRegistry {
1341    pub fn register(&mut self, binding: TriggerBinding) {
1342        self.bindings
1343            .entry(binding.provider.clone())
1344            .or_default()
1345            .push(binding);
1346    }
1347
1348    pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
1349        &self.bindings
1350    }
1351
1352    pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
1353        self.bindings
1354            .get(provider)
1355            .map(Vec::as_slice)
1356            .unwrap_or(&[])
1357    }
1358}
1359
1360/// Metadata returned from connector activation.
1361#[derive(Clone, Debug, PartialEq, Eq)]
1362pub struct ActivationHandle {
1363    pub provider: ProviderId,
1364    pub binding_count: usize,
1365}
1366
1367impl ActivationHandle {
1368    pub fn new(provider: ProviderId, binding_count: usize) -> Self {
1369        Self {
1370            provider,
1371            binding_count,
1372        }
1373    }
1374}
1375
1376/// Provider-native inbound request payload preserved as raw bytes.
1377#[derive(Clone, Debug, PartialEq)]
1378pub struct RawInbound {
1379    pub kind: String,
1380    pub headers: BTreeMap<String, String>,
1381    pub query: BTreeMap<String, String>,
1382    pub body: Vec<u8>,
1383    pub received_at: OffsetDateTime,
1384    pub occurred_at: Option<OffsetDateTime>,
1385    pub tenant_id: Option<TenantId>,
1386    pub metadata: JsonValue,
1387}
1388
1389impl RawInbound {
1390    pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
1391        Self {
1392            kind: kind.into(),
1393            headers,
1394            query: BTreeMap::new(),
1395            body,
1396            received_at: clock::now_utc(),
1397            occurred_at: None,
1398            tenant_id: None,
1399            metadata: JsonValue::Null,
1400        }
1401    }
1402
1403    pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
1404        Ok(serde_json::from_slice(&self.body)?)
1405    }
1406}
1407
1408/// Token-bucket configuration shared across connector clients.
1409#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1410pub struct RateLimitConfig {
1411    pub capacity: u32,
1412    pub refill_tokens: u32,
1413    pub refill_interval: StdDuration,
1414}
1415
1416impl Default for RateLimitConfig {
1417    fn default() -> Self {
1418        Self {
1419            capacity: 60,
1420            refill_tokens: 1,
1421            refill_interval: StdDuration::from_secs(1),
1422        }
1423    }
1424}
1425
1426#[derive(Clone, Debug)]
1427struct TokenBucket {
1428    tokens: f64,
1429    last_refill: ClockInstant,
1430}
1431
1432impl TokenBucket {
1433    fn full(config: RateLimitConfig) -> Self {
1434        Self {
1435            tokens: config.capacity as f64,
1436            last_refill: clock::instant_now(),
1437        }
1438    }
1439
1440    fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
1441        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1442        let rate = config.refill_tokens.max(1) as f64 / interval;
1443        let elapsed = now.duration_since(self.last_refill).as_secs_f64();
1444        self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
1445        self.last_refill = now;
1446    }
1447
1448    fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
1449        self.refill(config, now);
1450        if self.tokens >= 1.0 {
1451            self.tokens -= 1.0;
1452            true
1453        } else {
1454            false
1455        }
1456    }
1457
1458    fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
1459        if self.tokens >= 1.0 {
1460            return StdDuration::ZERO;
1461        }
1462        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1463        let rate = config.refill_tokens.max(1) as f64 / interval;
1464        let missing = (1.0 - self.tokens).max(0.0);
1465        StdDuration::from_secs_f64((missing / rate).max(0.001))
1466    }
1467}
1468
1469/// Shared per-provider, per-key token bucket factory for outbound connector clients.
1470#[derive(Debug)]
1471pub struct RateLimiterFactory {
1472    config: RateLimitConfig,
1473    buckets: Mutex<HashMap<(String, String), TokenBucket>>,
1474}
1475
1476impl RateLimiterFactory {
1477    pub fn new(config: RateLimitConfig) -> Self {
1478        Self {
1479            config,
1480            buckets: Mutex::new(HashMap::new()),
1481        }
1482    }
1483
1484    pub fn config(&self) -> RateLimitConfig {
1485        self.config
1486    }
1487
1488    pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
1489        ScopedRateLimiter {
1490            factory: self,
1491            provider: provider.clone(),
1492            key: key.into(),
1493        }
1494    }
1495
1496    pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
1497        let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1498        let bucket = buckets
1499            .entry((provider.as_str().to_string(), key.to_string()))
1500            .or_insert_with(|| TokenBucket::full(self.config));
1501        bucket.try_acquire(self.config, clock::instant_now())
1502    }
1503
1504    pub async fn acquire(&self, provider: &ProviderId, key: &str) {
1505        loop {
1506            let wait = {
1507                let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1508                let bucket = buckets
1509                    .entry((provider.as_str().to_string(), key.to_string()))
1510                    .or_insert_with(|| TokenBucket::full(self.config));
1511                if bucket.try_acquire(self.config, clock::instant_now()) {
1512                    return;
1513                }
1514                bucket.wait_duration(self.config)
1515            };
1516            tokio::time::sleep(wait).await;
1517        }
1518    }
1519}
1520
1521impl Default for RateLimiterFactory {
1522    fn default() -> Self {
1523        Self::new(RateLimitConfig::default())
1524    }
1525}
1526
1527/// Borrowed view onto a single provider/key rate-limit scope.
1528#[derive(Clone, Debug)]
1529pub struct ScopedRateLimiter<'a> {
1530    factory: &'a RateLimiterFactory,
1531    provider: ProviderId,
1532    key: String,
1533}
1534
1535impl<'a> ScopedRateLimiter<'a> {
1536    pub fn try_acquire(&self) -> bool {
1537        self.factory.try_acquire(&self.provider, &self.key)
1538    }
1539
1540    pub async fn acquire(&self) {
1541        self.factory.acquire(&self.provider, &self.key).await;
1542    }
1543}
1544
1545/// Runtime connector registry keyed by provider id.
1546pub struct ConnectorRegistry {
1547    connectors: BTreeMap<ProviderId, ConnectorHandle>,
1548}
1549
1550impl ConnectorRegistry {
1551    pub fn empty() -> Self {
1552        Self {
1553            connectors: BTreeMap::new(),
1554        }
1555    }
1556
1557    pub fn with_defaults() -> Self {
1558        let mut registry = Self::empty();
1559        for provider in registered_provider_metadata() {
1560            registry
1561                .register(default_connector_for_provider(&provider))
1562                .expect("default connector registration should not fail");
1563        }
1564        registry
1565    }
1566
1567    pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
1568        let provider = connector.provider_id().clone();
1569        if self.connectors.contains_key(&provider) {
1570            return Err(ConnectorError::DuplicateProvider(provider.0));
1571        }
1572        self.connectors
1573            .insert(provider, Arc::new(AsyncMutex::new(connector)));
1574        Ok(())
1575    }
1576
1577    pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
1578        self.connectors.get(id).cloned()
1579    }
1580
1581    pub fn remove(&mut self, id: &ProviderId) -> Option<ConnectorHandle> {
1582        self.connectors.remove(id)
1583    }
1584
1585    pub fn list(&self) -> Vec<ProviderId> {
1586        self.connectors.keys().cloned().collect()
1587    }
1588
1589    pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1590        for connector in self.connectors.values() {
1591            connector.lock().await.init(ctx.clone()).await?;
1592        }
1593        Ok(())
1594    }
1595
1596    pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
1597        let mut clients = BTreeMap::new();
1598        for (provider, connector) in &self.connectors {
1599            let client = connector.lock().await.client();
1600            clients.insert(provider.clone(), client);
1601        }
1602        clients
1603    }
1604
1605    pub async fn activate_all(
1606        &self,
1607        registry: &TriggerRegistry,
1608    ) -> Result<Vec<ActivationHandle>, ConnectorError> {
1609        let mut handles = Vec::new();
1610        for (provider, connector) in &self.connectors {
1611            let bindings = registry.bindings_for(provider);
1612            if bindings.is_empty() {
1613                continue;
1614            }
1615            let connector = connector.lock().await;
1616            handles.push(connector.activate(bindings).await?);
1617        }
1618        Ok(handles)
1619    }
1620}
1621
1622impl Default for ConnectorRegistry {
1623    fn default() -> Self {
1624        Self::with_defaults()
1625    }
1626}
1627
1628fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
1629    // The provider catalog on main registers `github` with
1630    // ProviderRuntimeMetadata::Builtin { connector: "webhook", ... } so that
1631    // before a native connector existed the catalog auto-wired a
1632    // GenericWebhookConnector. Now that #170 lands a first-class
1633    // GitHubConnector (inbound HMAC + GitHub App outbound), we short-circuit
1634    // provider_id "github" here and return the native connector instead of a
1635    // webhook-backed fallback. This keeps manifests that say
1636    // `provider = "github"` pointed at the new connector without requiring
1637    // users to switch to a distinct provider_id.
1638    if provider.provider == "github" {
1639        return Box::new(GitHubConnector::new());
1640    }
1641    if provider.provider == "linear" {
1642        return Box::new(LinearConnector::new());
1643    }
1644    if provider.provider == "slack" {
1645        return Box::new(SlackConnector::new());
1646    }
1647    if provider.provider == "notion" {
1648        return Box::new(NotionConnector::new());
1649    }
1650    if provider.provider == "a2a-push" {
1651        return Box::new(A2aPushConnector::new());
1652    }
1653    match &provider.runtime {
1654        ProviderRuntimeMetadata::Builtin {
1655            connector,
1656            default_signature_variant,
1657        } => match connector.as_str() {
1658            "cron" => Box::new(CronConnector::new()),
1659            "stream" => Box::new(StreamConnector::new(
1660                ProviderId::from(provider.provider.clone()),
1661                provider.schema_name.clone(),
1662            )),
1663            "webhook" => {
1664                let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
1665                    .expect("catalog webhook signature variant must be valid");
1666                Box::new(GenericWebhookConnector::with_profile(
1667                    WebhookProviderProfile::new(
1668                        ProviderId::from(provider.provider.clone()),
1669                        provider.schema_name.clone(),
1670                        variant,
1671                    ),
1672                ))
1673            }
1674            _ => Box::new(PlaceholderConnector::from_metadata(provider)),
1675        },
1676        ProviderRuntimeMetadata::Placeholder => {
1677            Box::new(PlaceholderConnector::from_metadata(provider))
1678        }
1679    }
1680}
1681
1682struct PlaceholderConnector {
1683    provider_id: ProviderId,
1684    kinds: Vec<TriggerKind>,
1685    schema_name: String,
1686}
1687
1688impl PlaceholderConnector {
1689    fn from_metadata(metadata: &ProviderMetadata) -> Self {
1690        Self {
1691            provider_id: ProviderId::from(metadata.provider.clone()),
1692            kinds: metadata
1693                .kinds
1694                .iter()
1695                .cloned()
1696                .map(TriggerKind::from)
1697                .collect(),
1698            schema_name: metadata.schema_name.clone(),
1699        }
1700    }
1701}
1702
1703struct PlaceholderClient;
1704
1705#[async_trait]
1706impl ConnectorClient for PlaceholderClient {
1707    async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1708        Err(ClientError::Other(format!(
1709            "connector client method '{method}' is not implemented for this provider"
1710        )))
1711    }
1712}
1713
1714#[async_trait]
1715impl Connector for PlaceholderConnector {
1716    fn provider_id(&self) -> &ProviderId {
1717        &self.provider_id
1718    }
1719
1720    fn kinds(&self) -> &[TriggerKind] {
1721        &self.kinds
1722    }
1723
1724    async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1725        Ok(())
1726    }
1727
1728    async fn activate(
1729        &self,
1730        bindings: &[TriggerBinding],
1731    ) -> Result<ActivationHandle, ConnectorError> {
1732        Ok(ActivationHandle::new(
1733            self.provider_id.clone(),
1734            bindings.len(),
1735        ))
1736    }
1737
1738    async fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
1739        Err(ConnectorError::Unsupported(format!(
1740            "provider '{}' is cataloged but does not have a concrete inbound connector yet",
1741            self.provider_id.as_str()
1742        )))
1743    }
1744
1745    fn payload_schema(&self) -> ProviderPayloadSchema {
1746        ProviderPayloadSchema::named(self.schema_name.clone())
1747    }
1748
1749    fn client(&self) -> Arc<dyn ConnectorClient> {
1750        Arc::new(PlaceholderClient)
1751    }
1752}
1753
1754pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
1755    ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
1756        *slot.borrow_mut() = clients
1757            .into_iter()
1758            .map(|(provider, client)| (provider.as_str().to_string(), client))
1759            .collect();
1760    });
1761}
1762
1763pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
1764    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
1765}
1766
1767pub fn clear_active_connector_clients() {
1768    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
1769}
1770
1771#[cfg(test)]
1772mod tests {
1773    use super::*;
1774
1775    use std::sync::atomic::{AtomicUsize, Ordering};
1776
1777    use async_trait::async_trait;
1778    use serde_json::json;
1779
1780    struct NoopClient;
1781
1782    #[async_trait]
1783    impl ConnectorClient for NoopClient {
1784        async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1785            Ok(json!({ "method": method }))
1786        }
1787    }
1788
1789    struct FakeConnector {
1790        provider_id: ProviderId,
1791        kinds: Vec<TriggerKind>,
1792        activate_calls: Arc<AtomicUsize>,
1793    }
1794
1795    impl FakeConnector {
1796        fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
1797            Self {
1798                provider_id: ProviderId::from(provider_id),
1799                kinds: vec![TriggerKind::from("webhook")],
1800                activate_calls,
1801            }
1802        }
1803    }
1804
1805    #[async_trait]
1806    impl Connector for FakeConnector {
1807        fn provider_id(&self) -> &ProviderId {
1808            &self.provider_id
1809        }
1810
1811        fn kinds(&self) -> &[TriggerKind] {
1812            &self.kinds
1813        }
1814
1815        async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1816            Ok(())
1817        }
1818
1819        async fn activate(
1820            &self,
1821            bindings: &[TriggerBinding],
1822        ) -> Result<ActivationHandle, ConnectorError> {
1823            self.activate_calls.fetch_add(1, Ordering::SeqCst);
1824            Ok(ActivationHandle::new(
1825                self.provider_id.clone(),
1826                bindings.len(),
1827            ))
1828        }
1829
1830        async fn normalize_inbound(
1831            &self,
1832            _raw: RawInbound,
1833        ) -> Result<TriggerEvent, ConnectorError> {
1834            Err(ConnectorError::Unsupported(
1835                "not needed for registry tests".to_string(),
1836            ))
1837        }
1838
1839        fn payload_schema(&self) -> ProviderPayloadSchema {
1840            ProviderPayloadSchema::named("FakePayload")
1841        }
1842
1843        fn client(&self) -> Arc<dyn ConnectorClient> {
1844            Arc::new(NoopClient)
1845        }
1846    }
1847
1848    #[tokio::test]
1849    async fn connector_registry_rejects_duplicate_providers() {
1850        let activate_calls = Arc::new(AtomicUsize::new(0));
1851        let mut registry = ConnectorRegistry::empty();
1852        registry
1853            .register(Box::new(FakeConnector::new(
1854                "github",
1855                activate_calls.clone(),
1856            )))
1857            .unwrap();
1858
1859        let error = registry
1860            .register(Box::new(FakeConnector::new("github", activate_calls)))
1861            .unwrap_err();
1862        assert!(matches!(
1863            error,
1864            ConnectorError::DuplicateProvider(provider) if provider == "github"
1865        ));
1866    }
1867
1868    #[tokio::test]
1869    async fn connector_registry_activates_only_bound_connectors() {
1870        let github_calls = Arc::new(AtomicUsize::new(0));
1871        let slack_calls = Arc::new(AtomicUsize::new(0));
1872        let mut registry = ConnectorRegistry::empty();
1873        registry
1874            .register(Box::new(FakeConnector::new("github", github_calls.clone())))
1875            .unwrap();
1876        registry
1877            .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
1878            .unwrap();
1879
1880        let mut trigger_registry = TriggerRegistry::default();
1881        trigger_registry.register(TriggerBinding::new(
1882            ProviderId::from("github"),
1883            "webhook",
1884            "github.push",
1885        ));
1886        trigger_registry.register(TriggerBinding::new(
1887            ProviderId::from("github"),
1888            "webhook",
1889            "github.installation",
1890        ));
1891
1892        let handles = registry.activate_all(&trigger_registry).await.unwrap();
1893        assert_eq!(handles.len(), 1);
1894        assert_eq!(handles[0].provider.as_str(), "github");
1895        assert_eq!(handles[0].binding_count, 2);
1896        assert_eq!(github_calls.load(Ordering::SeqCst), 1);
1897        assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
1898    }
1899
1900    #[test]
1901    fn rate_limiter_scopes_tokens_by_provider_and_key() {
1902        let factory = RateLimiterFactory::new(RateLimitConfig {
1903            capacity: 1,
1904            refill_tokens: 1,
1905            refill_interval: StdDuration::from_secs(60),
1906        });
1907
1908        assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
1909        assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
1910        assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
1911        assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
1912    }
1913
1914    #[test]
1915    fn raw_inbound_json_body_preserves_raw_bytes() {
1916        let raw = RawInbound::new(
1917            "push",
1918            BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
1919            br#"{"ok":true}"#.to_vec(),
1920        );
1921
1922        assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
1923    }
1924
1925    #[test]
1926    fn connector_registry_lists_catalog_providers() {
1927        let registry = ConnectorRegistry::default();
1928        let providers = registry.list();
1929        assert!(providers.contains(&ProviderId::from("cron")));
1930        assert!(providers.contains(&ProviderId::from("github")));
1931        assert!(providers.contains(&ProviderId::from("webhook")));
1932    }
1933
1934    #[test]
1935    fn metrics_registry_exports_orchestrator_metric_families() {
1936        let metrics = MetricsRegistry::default();
1937        metrics.record_http_request(
1938            "/triggers/github",
1939            "POST",
1940            200,
1941            StdDuration::from_millis(25),
1942            512,
1943        );
1944        metrics.record_trigger_received("github-new-issue", "github");
1945        metrics.record_trigger_deduped("github-new-issue", "inbox_duplicate");
1946        metrics.record_trigger_predicate_evaluation("github-new-issue", true, 0.002);
1947        metrics.record_trigger_dispatched("github-new-issue", "local", "succeeded");
1948        metrics.record_trigger_retry("github-new-issue", 2);
1949        metrics.record_trigger_dlq("github-new-issue", "retry_exhausted");
1950        metrics.set_trigger_inflight("github-new-issue", 0);
1951        metrics.record_event_log_append(
1952            "orchestrator.triggers.pending",
1953            StdDuration::from_millis(1),
1954            2048,
1955        );
1956        metrics.set_event_log_consumer_lag("orchestrator.triggers.pending", "orchestrator-pump", 0);
1957        metrics.set_trigger_budget_cost_today("github-new-issue", 0.002);
1958        metrics.record_trigger_budget_exhausted("github-new-issue", "daily_budget_exceeded");
1959        metrics.record_a2a_hop("agent.example", "succeeded", StdDuration::from_millis(10));
1960        metrics.set_worker_queue_depth("triage", 1);
1961        metrics.record_worker_queue_claim_age("triage", 3.0);
1962        metrics.set_orchestrator_pump_backlog("trigger.inbox.envelopes", 2);
1963        metrics.set_orchestrator_pump_outstanding("trigger.inbox.envelopes", 1);
1964        metrics.record_orchestrator_pump_admission_delay(
1965            "trigger.inbox.envelopes",
1966            StdDuration::from_millis(50),
1967        );
1968        metrics.record_trigger_accepted_to_normalized(
1969            "github-new-issue",
1970            "github-new-issue@v7",
1971            "github",
1972            Some("tenant-a"),
1973            "normalized",
1974            StdDuration::from_millis(25),
1975        );
1976        metrics.record_trigger_accepted_to_queue_append(
1977            "github-new-issue",
1978            "github-new-issue@v7",
1979            "github",
1980            Some("tenant-a"),
1981            "queued",
1982            StdDuration::from_millis(40),
1983        );
1984        metrics.record_trigger_queue_age_at_dispatch_admission(
1985            "github-new-issue",
1986            "github-new-issue@v7",
1987            "github",
1988            Some("tenant-a"),
1989            "admitted",
1990            StdDuration::from_millis(75),
1991        );
1992        metrics.record_trigger_queue_age_at_dispatch_start(
1993            "github-new-issue",
1994            "github-new-issue@v7",
1995            "github",
1996            Some("tenant-a"),
1997            "started",
1998            StdDuration::from_millis(125),
1999        );
2000        metrics.record_trigger_dispatch_runtime(
2001            "github-new-issue",
2002            "github-new-issue@v7",
2003            "github",
2004            Some("tenant-a"),
2005            "succeeded",
2006            StdDuration::from_millis(250),
2007        );
2008        metrics.record_trigger_retry_delay(
2009            "github-new-issue",
2010            "github-new-issue@v7",
2011            "github",
2012            Some("tenant-a"),
2013            "scheduled",
2014            StdDuration::from_secs(2),
2015        );
2016        metrics.record_trigger_accepted_to_dlq(
2017            "github-new-issue",
2018            "github-new-issue@v7",
2019            "github",
2020            Some("tenant-a"),
2021            "retry_exhausted",
2022            StdDuration::from_secs(45),
2023        );
2024        metrics.record_backpressure_event("ingest", "reject");
2025        metrics.note_trigger_pending_event(
2026            "evt-1",
2027            "github-new-issue",
2028            "github-new-issue@v7",
2029            "github",
2030            Some("tenant-a"),
2031            1_000,
2032            4_000,
2033        );
2034        metrics.record_llm_call("mock", "mock", "succeeded", 0.01);
2035        metrics.record_llm_cache_hit("mock");
2036
2037        let rendered = metrics.render_prometheus();
2038        for needle in [
2039            "harn_http_requests_total{endpoint=\"/triggers/github\",method=\"POST\",status=\"200\"} 1",
2040            "harn_http_request_duration_seconds_bucket{endpoint=\"/triggers/github\",le=\"0.05\"} 1",
2041            "harn_http_body_size_bytes_bucket{endpoint=\"/triggers/github\",le=\"512\"} 1",
2042            "harn_trigger_received_total{provider=\"github\",trigger_id=\"github-new-issue\"} 1",
2043            "harn_trigger_deduped_total{reason=\"inbox_duplicate\",trigger_id=\"github-new-issue\"} 1",
2044            "harn_trigger_predicate_evaluations_total{result=\"true\",trigger_id=\"github-new-issue\"} 1",
2045            "harn_trigger_predicate_cost_usd_bucket{le=\"0.01\",trigger_id=\"github-new-issue\"} 1",
2046            "harn_trigger_dispatched_total{handler_kind=\"local\",outcome=\"succeeded\",trigger_id=\"github-new-issue\"} 1",
2047            "harn_trigger_retries_total{attempt=\"2\",trigger_id=\"github-new-issue\"} 1",
2048            "harn_trigger_dlq_total{reason=\"retry_exhausted\",trigger_id=\"github-new-issue\"} 1",
2049            "harn_trigger_inflight{trigger_id=\"github-new-issue\"} 0",
2050            "harn_event_log_append_duration_seconds_bucket{le=\"0.005\",topic=\"orchestrator.triggers.pending\"} 1",
2051            "harn_event_log_topic_size_bytes{topic=\"orchestrator.triggers.pending\"} 2048",
2052            "harn_event_log_consumer_lag{consumer=\"orchestrator-pump\",topic=\"orchestrator.triggers.pending\"} 0",
2053            "harn_trigger_budget_cost_today_usd{trigger_id=\"github-new-issue\"} 0.002",
2054            "harn_trigger_budget_exhausted_total{strategy=\"daily_budget_exceeded\",trigger_id=\"github-new-issue\"} 1",
2055            "harn_backpressure_events_total{action=\"reject\",dimension=\"ingest\"} 1",
2056            "harn_a2a_hops_total{outcome=\"succeeded\",target=\"agent.example\"} 1",
2057            "harn_a2a_hop_duration_seconds_bucket{le=\"0.01\",target=\"agent.example\"} 1",
2058            "harn_worker_queue_depth{queue=\"triage\"} 1",
2059            "harn_worker_queue_claim_age_seconds_bucket{le=\"5\",queue=\"triage\"} 1",
2060            "harn_orchestrator_pump_backlog{topic=\"trigger.inbox.envelopes\"} 2",
2061            "harn_orchestrator_pump_outstanding{topic=\"trigger.inbox.envelopes\"} 1",
2062            "harn_orchestrator_pump_admission_delay_seconds_bucket{le=\"0.05\",topic=\"trigger.inbox.envelopes\"} 1",
2063            "harn_trigger_webhook_accepted_to_normalized_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.025\",provider=\"github\",status=\"normalized\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2064            "harn_trigger_webhook_accepted_to_queue_append_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.05\",provider=\"github\",status=\"queued\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2065            "harn_trigger_queue_age_at_dispatch_admission_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.1\",provider=\"github\",status=\"admitted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2066            "harn_trigger_queue_age_at_dispatch_start_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"started\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2067            "harn_trigger_dispatch_runtime_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"succeeded\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2068            "harn_trigger_retry_delay_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"2.5\",provider=\"github\",status=\"scheduled\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2069            "harn_trigger_accepted_to_dlq_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"60\",provider=\"github\",status=\"retry_exhausted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2070            "harn_trigger_oldest_pending_age_seconds{binding_key=\"github-new-issue@v7\",provider=\"github\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 3",
2071            "harn_llm_calls_total{model=\"mock\",outcome=\"succeeded\",provider=\"mock\"} 1",
2072            "harn_llm_cost_usd_total{model=\"mock\",provider=\"mock\"} 0.01",
2073            "harn_llm_cache_hits_total{provider=\"mock\"} 1",
2074        ] {
2075            assert!(rendered.contains(needle), "missing {needle}\n{rendered}");
2076        }
2077    }
2078}