Skip to main content

harn_vm/connectors/
mod.rs

1//! Connector traits and shared helpers for inbound event-source providers.
2//!
3//! This lands in `harn-vm` for now because the current dependency surface
4//! (`EventLog`, `SecretProvider`, `TriggerEvent`) already lives here. If the
5//! connector ecosystem grows enough to justify extraction, the module can be
6//! split into a dedicated crate later without changing the high-level contract.
7
8use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex, OnceLock};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25    registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26    ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod a2a_push;
30pub mod cron;
31pub mod effect_policy;
32pub mod github;
33pub mod harn_module;
34pub mod hmac;
35pub mod linear;
36pub mod notion;
37pub mod slack;
38pub mod stream;
39#[cfg(test)]
40pub(crate) mod test_util;
41pub mod testkit;
42pub mod webhook;
43
44pub use a2a_push::A2aPushConnector;
45pub use cron::{CatchupMode, CronConnector};
46pub use effect_policy::{
47    connector_export_denied_builtin_reason, connector_export_effect_class,
48    default_connector_export_policy, ConnectorExportEffectClass, HarnConnectorEffectPolicies,
49};
50pub use github::GitHubConnector;
51pub use harn_module::{
52    load_contract as load_harn_connector_contract, HarnConnector, HarnConnectorContract,
53};
54pub use hmac::{
55    verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
56    DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
57    DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
58    DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
59    DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
60    DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
61    SIGNATURE_VERIFY_AUDIT_TOPIC,
62};
63pub use linear::LinearConnector;
64pub use notion::{
65    load_pending_webhook_handshakes, NotionConnector, PersistedNotionWebhookHandshake,
66};
67pub use slack::SlackConnector;
68pub use stream::StreamConnector;
69use webhook::WebhookProviderProfile;
70pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
71
72/// Shared owned handle to a connector instance registered with the runtime.
73pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
74
75thread_local! {
76    static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
77        RefCell::new(BTreeMap::new());
78}
79
80/// Provider implementation contract for inbound connectors.
81#[async_trait]
82pub trait Connector: Send + Sync {
83    /// Stable provider id such as `github`, `slack`, or `webhook`.
84    fn provider_id(&self) -> &ProviderId;
85
86    /// Trigger kinds this connector supports (`webhook`, `poll`, `stream`, ...).
87    fn kinds(&self) -> &[TriggerKind];
88
89    /// Called once per connector instance at orchestrator startup.
90    async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
91
92    /// Activate the bindings relevant to this connector instance.
93    async fn activate(
94        &self,
95        bindings: &[TriggerBinding],
96    ) -> Result<ActivationHandle, ConnectorError>;
97
98    /// Stop connector-owned background work and flush any connector-local state.
99    async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
100        Ok(())
101    }
102
103    /// Verify + normalize a provider-native inbound request into `TriggerEvent`.
104    async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
105
106    /// Verify + normalize a provider-native inbound request into the richer
107    /// connector result contract used by ack-first webhook adapters.
108    async fn normalize_inbound_result(
109        &self,
110        raw: RawInbound,
111    ) -> Result<ConnectorNormalizeResult, ConnectorError> {
112        self.normalize_inbound(raw)
113            .await
114            .map(ConnectorNormalizeResult::event)
115    }
116
117    /// Payload schema surfaced to future trigger-type narrowing.
118    fn payload_schema(&self) -> ProviderPayloadSchema;
119
120    /// Outbound API wrapper exposed to handlers.
121    fn client(&self) -> Arc<dyn ConnectorClient>;
122}
123
124/// Provider-supplied HTTP response returned before or instead of trigger dispatch.
125#[derive(Clone, Debug, PartialEq)]
126pub struct ConnectorHttpResponse {
127    pub status: u16,
128    pub headers: BTreeMap<String, String>,
129    pub body: JsonValue,
130}
131
132impl ConnectorHttpResponse {
133    pub fn new(status: u16, headers: BTreeMap<String, String>, body: JsonValue) -> Self {
134        Self {
135            status,
136            headers,
137            body,
138        }
139    }
140}
141
142/// Normalized inbound result accepted by the runtime connector adapter.
143#[derive(Clone, Debug, PartialEq)]
144pub enum ConnectorNormalizeResult {
145    Event(Box<TriggerEvent>),
146    Batch(Vec<TriggerEvent>),
147    ImmediateResponse {
148        response: ConnectorHttpResponse,
149        events: Vec<TriggerEvent>,
150    },
151    Reject(ConnectorHttpResponse),
152}
153
154impl ConnectorNormalizeResult {
155    pub fn event(event: TriggerEvent) -> Self {
156        Self::Event(Box::new(event))
157    }
158
159    pub fn into_events(self) -> Vec<TriggerEvent> {
160        match self {
161            Self::Event(event) => vec![*event],
162            Self::Batch(events) | Self::ImmediateResponse { events, .. } => events,
163            Self::Reject(_) => Vec::new(),
164        }
165    }
166}
167
168#[derive(Clone, Debug, PartialEq)]
169pub enum PostNormalizeOutcome {
170    Ready(Box<TriggerEvent>),
171    DuplicateDropped,
172}
173
174pub async fn postprocess_normalized_event(
175    inbox: &InboxIndex,
176    binding_id: &str,
177    dedupe_enabled: bool,
178    dedupe_ttl: StdDuration,
179    mut event: TriggerEvent,
180) -> Result<PostNormalizeOutcome, ConnectorError> {
181    if dedupe_enabled && !event.dedupe_claimed() {
182        if !inbox
183            .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
184            .await?
185        {
186            return Ok(PostNormalizeOutcome::DuplicateDropped);
187        }
188        event.mark_dedupe_claimed();
189    }
190
191    Ok(PostNormalizeOutcome::Ready(Box::new(event)))
192}
193
194/// Outbound provider client interface used by connector-backed stdlib modules.
195#[async_trait]
196pub trait ConnectorClient: Send + Sync {
197    async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
198}
199
200/// Minimal outbound client errors shared by connector implementations.
201#[derive(Clone, Debug, PartialEq, Eq)]
202pub enum ClientError {
203    MethodNotFound(String),
204    InvalidArgs(String),
205    RateLimited(String),
206    Transport(String),
207    Other(String),
208}
209
210impl fmt::Display for ClientError {
211    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212        match self {
213            Self::MethodNotFound(message)
214            | Self::InvalidArgs(message)
215            | Self::RateLimited(message)
216            | Self::Transport(message)
217            | Self::Other(message) => message.fmt(f),
218        }
219    }
220}
221
222impl std::error::Error for ClientError {}
223
224/// Shared connector-layer errors.
225#[derive(Debug)]
226pub enum ConnectorError {
227    DuplicateProvider(String),
228    DuplicateDelivery(String),
229    UnknownProvider(String),
230    MissingHeader(String),
231    InvalidHeader {
232        name: String,
233        detail: String,
234    },
235    InvalidSignature(String),
236    TimestampOutOfWindow {
237        timestamp: OffsetDateTime,
238        now: OffsetDateTime,
239        window: time::Duration,
240    },
241    Json(String),
242    Secret(String),
243    EventLog(String),
244    HarnRuntime(String),
245    Client(ClientError),
246    Unsupported(String),
247    Activation(String),
248}
249
250impl ConnectorError {
251    pub fn invalid_signature(message: impl Into<String>) -> Self {
252        Self::InvalidSignature(message.into())
253    }
254}
255
256impl fmt::Display for ConnectorError {
257    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258        match self {
259            Self::DuplicateProvider(provider) => {
260                write!(f, "connector provider `{provider}` is already registered")
261            }
262            Self::DuplicateDelivery(message) => message.fmt(f),
263            Self::UnknownProvider(provider) => {
264                write!(f, "connector provider `{provider}` is not registered")
265            }
266            Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
267            Self::InvalidHeader { name, detail } => {
268                write!(f, "invalid header `{name}`: {detail}")
269            }
270            Self::InvalidSignature(message)
271            | Self::Json(message)
272            | Self::Secret(message)
273            | Self::EventLog(message)
274            | Self::HarnRuntime(message)
275            | Self::Unsupported(message)
276            | Self::Activation(message) => message.fmt(f),
277            Self::TimestampOutOfWindow {
278                timestamp,
279                now,
280                window,
281            } => write!(
282                f,
283                "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
284            ),
285            Self::Client(error) => error.fmt(f),
286        }
287    }
288}
289
290impl std::error::Error for ConnectorError {}
291
292impl From<crate::event_log::LogError> for ConnectorError {
293    fn from(value: crate::event_log::LogError) -> Self {
294        Self::EventLog(value.to_string())
295    }
296}
297
298impl From<crate::secrets::SecretError> for ConnectorError {
299    fn from(value: crate::secrets::SecretError) -> Self {
300        Self::Secret(value.to_string())
301    }
302}
303
304impl From<serde_json::Error> for ConnectorError {
305    fn from(value: serde_json::Error) -> Self {
306        Self::Json(value.to_string())
307    }
308}
309
310impl From<ClientError> for ConnectorError {
311    fn from(value: ClientError) -> Self {
312        Self::Client(value)
313    }
314}
315
316/// Startup context shared with connector instances.
317#[derive(Clone)]
318pub struct ConnectorCtx {
319    pub event_log: Arc<AnyEventLog>,
320    pub secrets: Arc<dyn SecretProvider>,
321    pub inbox: Arc<InboxIndex>,
322    pub metrics: Arc<MetricsRegistry>,
323    pub rate_limiter: Arc<RateLimiterFactory>,
324}
325
326/// Snapshot of connector-local metrics surfaced for tests and diagnostics.
327#[derive(Clone, Debug, Default, PartialEq, Eq)]
328pub struct ConnectorMetricsSnapshot {
329    pub inbox_claims_written: u64,
330    pub inbox_duplicates_rejected: u64,
331    pub inbox_fast_path_hits: u64,
332    pub inbox_durable_hits: u64,
333    pub inbox_expired_entries: u64,
334    pub inbox_active_entries: u64,
335    pub linear_timestamp_rejections_total: u64,
336    pub dispatch_succeeded_total: u64,
337    pub dispatch_failed_total: u64,
338    pub retry_scheduled_total: u64,
339    pub slack_delivery_success_total: u64,
340    pub slack_delivery_failure_total: u64,
341}
342
343type MetricLabels = BTreeMap<String, String>;
344
345#[derive(Clone, Debug, Default, PartialEq)]
346struct HistogramMetric {
347    buckets: BTreeMap<String, u64>,
348    count: u64,
349    sum: f64,
350}
351
352static ACTIVE_METRICS_REGISTRY: OnceLock<Mutex<Option<Arc<MetricsRegistry>>>> = OnceLock::new();
353
354pub fn install_active_metrics_registry(metrics: Arc<MetricsRegistry>) {
355    let slot = ACTIVE_METRICS_REGISTRY.get_or_init(|| Mutex::new(None));
356    *slot.lock().expect("active metrics registry poisoned") = Some(metrics);
357}
358
359pub fn clear_active_metrics_registry() {
360    if let Some(slot) = ACTIVE_METRICS_REGISTRY.get() {
361        *slot.lock().expect("active metrics registry poisoned") = None;
362    }
363}
364
365pub fn active_metrics_registry() -> Option<Arc<MetricsRegistry>> {
366    ACTIVE_METRICS_REGISTRY.get().and_then(|slot| {
367        slot.lock()
368            .expect("active metrics registry poisoned")
369            .clone()
370    })
371}
372
373/// Shared metrics surface for connector-local counters and timings.
374#[derive(Debug, Default)]
375pub struct MetricsRegistry {
376    inbox_claims_written: AtomicU64,
377    inbox_duplicates_rejected: AtomicU64,
378    inbox_fast_path_hits: AtomicU64,
379    inbox_durable_hits: AtomicU64,
380    inbox_expired_entries: AtomicU64,
381    inbox_active_entries: AtomicU64,
382    linear_timestamp_rejections_total: AtomicU64,
383    dispatch_succeeded_total: AtomicU64,
384    dispatch_failed_total: AtomicU64,
385    retry_scheduled_total: AtomicU64,
386    slack_delivery_success_total: AtomicU64,
387    slack_delivery_failure_total: AtomicU64,
388    custom_counters: Mutex<BTreeMap<String, u64>>,
389    counters: Mutex<BTreeMap<(String, MetricLabels), f64>>,
390    gauges: Mutex<BTreeMap<(String, MetricLabels), f64>>,
391    histograms: Mutex<BTreeMap<(String, MetricLabels), HistogramMetric>>,
392    pending_trigger_events: Mutex<BTreeMap<MetricLabels, BTreeMap<String, i64>>>,
393}
394
395impl MetricsRegistry {
396    const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0];
397    const TRIGGER_LATENCY_BUCKETS: [f64; 15] = [
398        0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0,
399    ];
400    const SIZE_BUCKETS: [f64; 9] = [
401        128.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0, 10485760.0,
402    ];
403
404    pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
405        ConnectorMetricsSnapshot {
406            inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
407            inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
408            inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
409            inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
410            inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
411            inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
412            linear_timestamp_rejections_total: self
413                .linear_timestamp_rejections_total
414                .load(Ordering::Relaxed),
415            dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
416            dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
417            retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
418            slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
419            slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
420        }
421    }
422
423    pub(crate) fn record_inbox_claim(&self) {
424        self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
425    }
426
427    pub(crate) fn record_inbox_duplicate_fast_path(&self) {
428        self.inbox_duplicates_rejected
429            .fetch_add(1, Ordering::Relaxed);
430        self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
431    }
432
433    pub(crate) fn record_inbox_duplicate_durable(&self) {
434        self.inbox_duplicates_rejected
435            .fetch_add(1, Ordering::Relaxed);
436        self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
437    }
438
439    pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
440        if count > 0 {
441            self.inbox_expired_entries
442                .fetch_add(count, Ordering::Relaxed);
443        }
444    }
445
446    pub(crate) fn set_inbox_active_entries(&self, count: usize) {
447        self.inbox_active_entries
448            .store(count as u64, Ordering::Relaxed);
449    }
450
451    pub fn record_linear_timestamp_rejection(&self) {
452        self.linear_timestamp_rejections_total
453            .fetch_add(1, Ordering::Relaxed);
454    }
455
456    pub fn record_dispatch_succeeded(&self) {
457        self.dispatch_succeeded_total
458            .fetch_add(1, Ordering::Relaxed);
459    }
460
461    pub fn record_dispatch_failed(&self) {
462        self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
463    }
464
465    pub fn record_retry_scheduled(&self) {
466        self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
467    }
468
469    pub fn record_slack_delivery_success(&self) {
470        self.slack_delivery_success_total
471            .fetch_add(1, Ordering::Relaxed);
472    }
473
474    pub fn record_slack_delivery_failure(&self) {
475        self.slack_delivery_failure_total
476            .fetch_add(1, Ordering::Relaxed);
477    }
478
479    pub fn record_custom_counter(&self, name: &str, amount: u64) {
480        if amount == 0 {
481            return;
482        }
483        let mut counters = self
484            .custom_counters
485            .lock()
486            .expect("custom counters poisoned");
487        *counters.entry(name.to_string()).or_default() += amount;
488    }
489
490    pub fn record_http_request(
491        &self,
492        endpoint: &str,
493        method: &str,
494        status: u16,
495        duration: StdDuration,
496        body_size_bytes: usize,
497    ) {
498        self.increment_counter(
499            "harn_http_requests_total",
500            labels([
501                ("endpoint", endpoint),
502                ("method", method),
503                ("status", &status.to_string()),
504            ]),
505            1,
506        );
507        self.observe_histogram(
508            "harn_http_request_duration_seconds",
509            labels([("endpoint", endpoint)]),
510            duration.as_secs_f64(),
511            &Self::DURATION_BUCKETS,
512        );
513        self.observe_histogram(
514            "harn_http_body_size_bytes",
515            labels([("endpoint", endpoint)]),
516            body_size_bytes as f64,
517            &Self::SIZE_BUCKETS,
518        );
519    }
520
521    pub fn record_trigger_received(&self, trigger_id: &str, provider: &str) {
522        self.increment_counter(
523            "harn_trigger_received_total",
524            labels([("trigger_id", trigger_id), ("provider", provider)]),
525            1,
526        );
527    }
528
529    pub fn record_trigger_deduped(&self, trigger_id: &str, reason: &str) {
530        self.increment_counter(
531            "harn_trigger_deduped_total",
532            labels([("trigger_id", trigger_id), ("reason", reason)]),
533            1,
534        );
535    }
536
537    pub fn record_trigger_predicate_evaluation(
538        &self,
539        trigger_id: &str,
540        result: bool,
541        cost_usd: f64,
542    ) {
543        self.increment_counter(
544            "harn_trigger_predicate_evaluations_total",
545            labels([
546                ("trigger_id", trigger_id),
547                ("result", if result { "true" } else { "false" }),
548            ]),
549            1,
550        );
551        self.observe_histogram(
552            "harn_trigger_predicate_cost_usd",
553            labels([("trigger_id", trigger_id)]),
554            cost_usd.max(0.0),
555            &[0.0, 0.001, 0.01, 0.05, 0.1, 1.0],
556        );
557    }
558
559    pub fn record_trigger_dispatched(&self, trigger_id: &str, handler_kind: &str, outcome: &str) {
560        self.increment_counter(
561            "harn_trigger_dispatched_total",
562            labels([
563                ("trigger_id", trigger_id),
564                ("handler_kind", handler_kind),
565                ("outcome", outcome),
566            ]),
567            1,
568        );
569    }
570
571    pub fn record_trigger_retry(&self, trigger_id: &str, attempt: u32) {
572        self.increment_counter(
573            "harn_trigger_retries_total",
574            labels([
575                ("trigger_id", trigger_id),
576                ("attempt", &attempt.to_string()),
577            ]),
578            1,
579        );
580    }
581
582    pub fn record_trigger_dlq(&self, trigger_id: &str, reason: &str) {
583        self.increment_counter(
584            "harn_trigger_dlq_total",
585            labels([("trigger_id", trigger_id), ("reason", reason)]),
586            1,
587        );
588    }
589
590    pub fn record_trigger_accepted_to_normalized(
591        &self,
592        trigger_id: &str,
593        binding_key: &str,
594        provider: &str,
595        tenant_id: Option<&str>,
596        status: &str,
597        duration: StdDuration,
598    ) {
599        self.observe_histogram(
600            "harn_trigger_webhook_accepted_to_normalized_seconds",
601            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
602            duration.as_secs_f64(),
603            &Self::TRIGGER_LATENCY_BUCKETS,
604        );
605    }
606
607    pub fn record_trigger_accepted_to_queue_append(
608        &self,
609        trigger_id: &str,
610        binding_key: &str,
611        provider: &str,
612        tenant_id: Option<&str>,
613        status: &str,
614        duration: StdDuration,
615    ) {
616        self.observe_histogram(
617            "harn_trigger_webhook_accepted_to_queue_append_seconds",
618            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
619            duration.as_secs_f64(),
620            &Self::TRIGGER_LATENCY_BUCKETS,
621        );
622    }
623
624    pub fn record_trigger_queue_age_at_dispatch_admission(
625        &self,
626        trigger_id: &str,
627        binding_key: &str,
628        provider: &str,
629        tenant_id: Option<&str>,
630        status: &str,
631        age: StdDuration,
632    ) {
633        self.observe_histogram(
634            "harn_trigger_queue_age_at_dispatch_admission_seconds",
635            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
636            age.as_secs_f64(),
637            &Self::TRIGGER_LATENCY_BUCKETS,
638        );
639    }
640
641    pub fn record_trigger_queue_age_at_dispatch_start(
642        &self,
643        trigger_id: &str,
644        binding_key: &str,
645        provider: &str,
646        tenant_id: Option<&str>,
647        status: &str,
648        age: StdDuration,
649    ) {
650        self.observe_histogram(
651            "harn_trigger_queue_age_at_dispatch_start_seconds",
652            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
653            age.as_secs_f64(),
654            &Self::TRIGGER_LATENCY_BUCKETS,
655        );
656    }
657
658    pub fn record_trigger_dispatch_runtime(
659        &self,
660        trigger_id: &str,
661        binding_key: &str,
662        provider: &str,
663        tenant_id: Option<&str>,
664        status: &str,
665        duration: StdDuration,
666    ) {
667        self.observe_histogram(
668            "harn_trigger_dispatch_runtime_seconds",
669            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
670            duration.as_secs_f64(),
671            &Self::TRIGGER_LATENCY_BUCKETS,
672        );
673    }
674
675    pub fn record_trigger_retry_delay(
676        &self,
677        trigger_id: &str,
678        binding_key: &str,
679        provider: &str,
680        tenant_id: Option<&str>,
681        status: &str,
682        duration: StdDuration,
683    ) {
684        self.observe_histogram(
685            "harn_trigger_retry_delay_seconds",
686            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
687            duration.as_secs_f64(),
688            &Self::TRIGGER_LATENCY_BUCKETS,
689        );
690    }
691
692    pub fn record_trigger_accepted_to_dlq(
693        &self,
694        trigger_id: &str,
695        binding_key: &str,
696        provider: &str,
697        tenant_id: Option<&str>,
698        status: &str,
699        duration: StdDuration,
700    ) {
701        self.observe_histogram(
702            "harn_trigger_accepted_to_dlq_seconds",
703            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
704            duration.as_secs_f64(),
705            &Self::TRIGGER_LATENCY_BUCKETS,
706        );
707    }
708
709    pub fn note_trigger_pending_event(
710        &self,
711        event_id: &str,
712        trigger_id: &str,
713        binding_key: &str,
714        provider: &str,
715        tenant_id: Option<&str>,
716        accepted_at_ms: i64,
717        now_ms: i64,
718    ) {
719        let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
720        {
721            let mut pending = self
722                .pending_trigger_events
723                .lock()
724                .expect("pending trigger events poisoned");
725            pending
726                .entry(labels.clone())
727                .or_default()
728                .insert(event_id.to_string(), accepted_at_ms);
729        }
730        self.refresh_oldest_pending_gauge(labels, now_ms);
731    }
732
733    pub fn clear_trigger_pending_event(
734        &self,
735        event_id: &str,
736        trigger_id: &str,
737        binding_key: &str,
738        provider: &str,
739        tenant_id: Option<&str>,
740        now_ms: i64,
741    ) {
742        let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
743        {
744            let mut pending = self
745                .pending_trigger_events
746                .lock()
747                .expect("pending trigger events poisoned");
748            if let Some(events) = pending.get_mut(&labels) {
749                events.remove(event_id);
750                if events.is_empty() {
751                    pending.remove(&labels);
752                }
753            }
754        }
755        self.refresh_oldest_pending_gauge(labels, now_ms);
756    }
757
758    pub fn set_trigger_inflight(&self, trigger_id: &str, count: u64) {
759        self.set_gauge(
760            "harn_trigger_inflight",
761            labels([("trigger_id", trigger_id)]),
762            count as f64,
763        );
764    }
765
766    pub fn set_trigger_budget_cost_today(&self, trigger_id: &str, cost_usd: f64) {
767        self.set_gauge(
768            "harn_trigger_budget_cost_today_usd",
769            labels([("trigger_id", trigger_id)]),
770            cost_usd.max(0.0),
771        );
772    }
773
774    pub fn record_trigger_budget_exhausted(&self, trigger_id: &str, strategy: &str) {
775        self.increment_counter(
776            "harn_trigger_budget_exhausted_total",
777            labels([("trigger_id", trigger_id), ("strategy", strategy)]),
778            1,
779        );
780    }
781
782    pub fn record_backpressure_event(&self, dimension: &str, action: &str) {
783        self.increment_counter(
784            "harn_backpressure_events_total",
785            labels([("dimension", dimension), ("action", action)]),
786            1,
787        );
788    }
789
790    pub fn record_event_log_append(
791        &self,
792        topic: &str,
793        duration: StdDuration,
794        payload_bytes: usize,
795    ) {
796        self.observe_histogram(
797            "harn_event_log_append_duration_seconds",
798            labels([("topic", topic)]),
799            duration.as_secs_f64(),
800            &Self::DURATION_BUCKETS,
801        );
802        self.set_gauge(
803            "harn_event_log_topic_size_bytes",
804            labels([("topic", topic)]),
805            payload_bytes as f64,
806        );
807    }
808
809    pub fn set_event_log_consumer_lag(&self, topic: &str, consumer: &str, lag: u64) {
810        self.set_gauge(
811            "harn_event_log_consumer_lag",
812            labels([("topic", topic), ("consumer", consumer)]),
813            lag as f64,
814        );
815    }
816
817    pub fn record_a2a_hop(&self, target: &str, outcome: &str, duration: StdDuration) {
818        self.increment_counter(
819            "harn_a2a_hops_total",
820            labels([("target", target), ("outcome", outcome)]),
821            1,
822        );
823        self.observe_histogram(
824            "harn_a2a_hop_duration_seconds",
825            labels([("target", target)]),
826            duration.as_secs_f64(),
827            &Self::DURATION_BUCKETS,
828        );
829    }
830
831    pub fn set_worker_queue_depth(&self, queue: &str, depth: u64) {
832        self.set_gauge(
833            "harn_worker_queue_depth",
834            labels([("queue", queue)]),
835            depth as f64,
836        );
837    }
838
839    pub fn record_worker_queue_claim_age(&self, queue: &str, age_seconds: f64) {
840        self.observe_histogram(
841            "harn_worker_queue_claim_age_seconds",
842            labels([("queue", queue)]),
843            age_seconds.max(0.0),
844            &Self::DURATION_BUCKETS,
845        );
846    }
847
848    pub fn set_orchestrator_pump_backlog(&self, topic: &str, count: u64) {
849        self.set_gauge(
850            "harn_orchestrator_pump_backlog",
851            labels([("topic", topic)]),
852            count as f64,
853        );
854    }
855
856    pub fn set_orchestrator_pump_outstanding(&self, topic: &str, count: usize) {
857        self.set_gauge(
858            "harn_orchestrator_pump_outstanding",
859            labels([("topic", topic)]),
860            count as f64,
861        );
862    }
863
864    pub fn record_orchestrator_pump_admission_delay(&self, topic: &str, duration: StdDuration) {
865        self.observe_histogram(
866            "harn_orchestrator_pump_admission_delay_seconds",
867            labels([("topic", topic)]),
868            duration.as_secs_f64(),
869            &Self::DURATION_BUCKETS,
870        );
871    }
872
873    pub fn record_llm_call(&self, provider: &str, model: &str, outcome: &str, cost_usd: f64) {
874        self.increment_counter(
875            "harn_llm_calls_total",
876            labels([
877                ("provider", provider),
878                ("model", model),
879                ("outcome", outcome),
880            ]),
881            1,
882        );
883        if cost_usd > 0.0 {
884            self.increment_counter(
885                "harn_llm_cost_usd_total",
886                labels([("provider", provider), ("model", model)]),
887                cost_usd,
888            );
889        } else {
890            self.ensure_counter(
891                "harn_llm_cost_usd_total",
892                labels([("provider", provider), ("model", model)]),
893            );
894        }
895    }
896
897    pub fn record_llm_cache_hit(&self, provider: &str) {
898        self.increment_counter(
899            "harn_llm_cache_hits_total",
900            labels([("provider", provider)]),
901            1,
902        );
903    }
904
905    pub fn render_prometheus(&self) -> String {
906        let snapshot = self.snapshot();
907        let counters = [
908            (
909                "connector_linear_timestamp_rejections_total",
910                snapshot.linear_timestamp_rejections_total,
911            ),
912            (
913                "dispatch_succeeded_total",
914                snapshot.dispatch_succeeded_total,
915            ),
916            ("dispatch_failed_total", snapshot.dispatch_failed_total),
917            ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
918            ("retry_scheduled_total", snapshot.retry_scheduled_total),
919            (
920                "slack_events_delivery_success_total",
921                snapshot.slack_delivery_success_total,
922            ),
923            (
924                "slack_events_delivery_failure_total",
925                snapshot.slack_delivery_failure_total,
926            ),
927        ];
928
929        let mut rendered = String::new();
930        for (name, value) in counters {
931            rendered.push_str("# TYPE ");
932            rendered.push_str(name);
933            rendered.push_str(" counter\n");
934            rendered.push_str(name);
935            rendered.push(' ');
936            rendered.push_str(&value.to_string());
937            rendered.push('\n');
938        }
939        let custom_counters = self
940            .custom_counters
941            .lock()
942            .expect("custom counters poisoned");
943        for (name, value) in custom_counters.iter() {
944            let metric_name = format!(
945                "connector_custom_{}_total",
946                name.chars()
947                    .map(|ch| if ch.is_ascii_alphanumeric() || ch == '_' {
948                        ch
949                    } else {
950                        '_'
951                    })
952                    .collect::<String>()
953            );
954            rendered.push_str("# TYPE ");
955            rendered.push_str(&metric_name);
956            rendered.push_str(" counter\n");
957            rendered.push_str(&metric_name);
958            rendered.push(' ');
959            rendered.push_str(&value.to_string());
960            rendered.push('\n');
961        }
962        rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
963        rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
964        rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
965        rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
966        self.render_generic_metrics(&mut rendered);
967        rendered
968    }
969
970    fn increment_counter(&self, name: &str, labels: MetricLabels, amount: impl Into<f64>) {
971        let amount = amount.into();
972        if amount <= 0.0 || !amount.is_finite() {
973            return;
974        }
975        let mut counters = self.counters.lock().expect("metrics counters poisoned");
976        *counters.entry((name.to_string(), labels)).or_default() += amount;
977    }
978
979    fn ensure_counter(&self, name: &str, labels: MetricLabels) {
980        let mut counters = self.counters.lock().expect("metrics counters poisoned");
981        counters.entry((name.to_string(), labels)).or_default();
982    }
983
984    fn set_gauge(&self, name: &str, labels: MetricLabels, value: f64) {
985        let mut gauges = self.gauges.lock().expect("metrics gauges poisoned");
986        gauges.insert((name.to_string(), labels), value);
987    }
988
989    fn observe_histogram(
990        &self,
991        name: &str,
992        labels: MetricLabels,
993        value: f64,
994        bucket_bounds: &[f64],
995    ) {
996        if !value.is_finite() {
997            return;
998        }
999        let mut histograms = self.histograms.lock().expect("metrics histograms poisoned");
1000        let histogram = histograms
1001            .entry((name.to_string(), labels))
1002            .or_insert_with(|| HistogramMetric {
1003                buckets: bucket_bounds
1004                    .iter()
1005                    .map(|bound| (prometheus_float(*bound), 0))
1006                    .chain(std::iter::once(("+Inf".to_string(), 0)))
1007                    .collect(),
1008                count: 0,
1009                sum: 0.0,
1010            });
1011        histogram.count += 1;
1012        histogram.sum += value;
1013        for bound in bucket_bounds {
1014            if value <= *bound {
1015                let key = prometheus_float(*bound);
1016                *histogram.buckets.entry(key).or_default() += 1;
1017            }
1018        }
1019        *histogram.buckets.entry("+Inf".to_string()).or_default() += 1;
1020    }
1021
1022    fn refresh_oldest_pending_gauge(&self, labels: MetricLabels, now_ms: i64) {
1023        let oldest_accepted_at_ms = self
1024            .pending_trigger_events
1025            .lock()
1026            .expect("pending trigger events poisoned")
1027            .get(&labels)
1028            .and_then(|events| events.values().min().copied());
1029        let age_seconds = oldest_accepted_at_ms
1030            .map(|accepted_at_ms| millis_delta(now_ms, accepted_at_ms).as_secs_f64())
1031            .unwrap_or(0.0);
1032        self.set_gauge(
1033            "harn_trigger_oldest_pending_age_seconds",
1034            labels,
1035            age_seconds,
1036        );
1037    }
1038
1039    fn render_generic_metrics(&self, rendered: &mut String) {
1040        let counters = self
1041            .counters
1042            .lock()
1043            .expect("metrics counters poisoned")
1044            .clone();
1045        let gauges = self.gauges.lock().expect("metrics gauges poisoned").clone();
1046        let histograms = self
1047            .histograms
1048            .lock()
1049            .expect("metrics histograms poisoned")
1050            .clone();
1051
1052        for name in metric_family_names(MetricKind::Counter) {
1053            rendered.push_str("# TYPE ");
1054            rendered.push_str(name);
1055            rendered.push_str(" counter\n");
1056            for ((sample_name, labels), value) in counters.iter().filter(|((n, _), _)| n == name) {
1057                render_sample(rendered, sample_name, labels, *value);
1058            }
1059        }
1060        for name in metric_family_names(MetricKind::Gauge) {
1061            rendered.push_str("# TYPE ");
1062            rendered.push_str(name);
1063            rendered.push_str(" gauge\n");
1064            for ((sample_name, labels), value) in gauges.iter().filter(|((n, _), _)| n == name) {
1065                render_sample(rendered, sample_name, labels, *value);
1066            }
1067        }
1068        for name in metric_family_names(MetricKind::Histogram) {
1069            rendered.push_str("# TYPE ");
1070            rendered.push_str(name);
1071            rendered.push_str(" histogram\n");
1072            for ((sample_name, labels), histogram) in
1073                histograms.iter().filter(|((n, _), _)| n == name)
1074            {
1075                for (le, value) in &histogram.buckets {
1076                    let mut bucket_labels = labels.clone();
1077                    bucket_labels.insert("le".to_string(), le.clone());
1078                    render_sample(
1079                        rendered,
1080                        &format!("{sample_name}_bucket"),
1081                        &bucket_labels,
1082                        *value as f64,
1083                    );
1084                }
1085                render_sample(
1086                    rendered,
1087                    &format!("{sample_name}_sum"),
1088                    labels,
1089                    histogram.sum,
1090                );
1091                render_sample(
1092                    rendered,
1093                    &format!("{sample_name}_count"),
1094                    labels,
1095                    histogram.count as f64,
1096                );
1097            }
1098        }
1099    }
1100}
1101
1102#[derive(Clone, Copy)]
1103enum MetricKind {
1104    Counter,
1105    Gauge,
1106    Histogram,
1107}
1108
1109fn metric_family_names(kind: MetricKind) -> &'static [&'static str] {
1110    match kind {
1111        MetricKind::Counter => &[
1112            "harn_http_requests_total",
1113            "harn_trigger_received_total",
1114            "harn_trigger_deduped_total",
1115            "harn_trigger_predicate_evaluations_total",
1116            "harn_trigger_dispatched_total",
1117            "harn_trigger_retries_total",
1118            "harn_trigger_dlq_total",
1119            "harn_trigger_budget_exhausted_total",
1120            "harn_backpressure_events_total",
1121            "harn_a2a_hops_total",
1122            "harn_llm_calls_total",
1123            "harn_llm_cost_usd_total",
1124            "harn_llm_cache_hits_total",
1125        ],
1126        MetricKind::Gauge => &[
1127            "harn_trigger_inflight",
1128            "harn_event_log_topic_size_bytes",
1129            "harn_event_log_consumer_lag",
1130            "harn_trigger_budget_cost_today_usd",
1131            "harn_worker_queue_depth",
1132            "harn_orchestrator_pump_backlog",
1133            "harn_orchestrator_pump_outstanding",
1134            "harn_trigger_oldest_pending_age_seconds",
1135        ],
1136        MetricKind::Histogram => &[
1137            "harn_http_request_duration_seconds",
1138            "harn_http_body_size_bytes",
1139            "harn_trigger_predicate_cost_usd",
1140            "harn_event_log_append_duration_seconds",
1141            "harn_a2a_hop_duration_seconds",
1142            "harn_worker_queue_claim_age_seconds",
1143            "harn_orchestrator_pump_admission_delay_seconds",
1144            "harn_trigger_webhook_accepted_to_normalized_seconds",
1145            "harn_trigger_webhook_accepted_to_queue_append_seconds",
1146            "harn_trigger_queue_age_at_dispatch_admission_seconds",
1147            "harn_trigger_queue_age_at_dispatch_start_seconds",
1148            "harn_trigger_dispatch_runtime_seconds",
1149            "harn_trigger_retry_delay_seconds",
1150            "harn_trigger_accepted_to_dlq_seconds",
1151        ],
1152    }
1153}
1154
1155fn labels<const N: usize>(pairs: [(&str, &str); N]) -> MetricLabels {
1156    pairs
1157        .into_iter()
1158        .map(|(name, value)| (name.to_string(), value.to_string()))
1159        .collect()
1160}
1161
1162fn trigger_lifecycle_labels(
1163    trigger_id: &str,
1164    binding_key: &str,
1165    provider: &str,
1166    tenant_id: Option<&str>,
1167    status: &str,
1168) -> MetricLabels {
1169    labels([
1170        ("binding_key", binding_key),
1171        ("provider", provider),
1172        ("status", status),
1173        ("tenant_id", tenant_label(tenant_id)),
1174        ("trigger_id", trigger_id),
1175    ])
1176}
1177
1178fn trigger_pending_labels(
1179    trigger_id: &str,
1180    binding_key: &str,
1181    provider: &str,
1182    tenant_id: Option<&str>,
1183) -> MetricLabels {
1184    labels([
1185        ("binding_key", binding_key),
1186        ("provider", provider),
1187        ("tenant_id", tenant_label(tenant_id)),
1188        ("trigger_id", trigger_id),
1189    ])
1190}
1191
1192fn tenant_label(tenant_id: Option<&str>) -> &str {
1193    tenant_id
1194        .map(str::trim)
1195        .filter(|value| !value.is_empty())
1196        .unwrap_or("none")
1197}
1198
1199fn millis_delta(later_ms: i64, earlier_ms: i64) -> StdDuration {
1200    StdDuration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
1201}
1202
1203fn render_sample(rendered: &mut String, name: &str, labels: &MetricLabels, value: f64) {
1204    rendered.push_str(name);
1205    if !labels.is_empty() {
1206        rendered.push('{');
1207        for (index, (label, label_value)) in labels.iter().enumerate() {
1208            if index > 0 {
1209                rendered.push(',');
1210            }
1211            rendered.push_str(label);
1212            rendered.push_str("=\"");
1213            rendered.push_str(&escape_label_value(label_value));
1214            rendered.push('"');
1215        }
1216        rendered.push('}');
1217    }
1218    rendered.push(' ');
1219    rendered.push_str(&prometheus_float(value));
1220    rendered.push('\n');
1221}
1222
1223fn escape_label_value(value: &str) -> String {
1224    value
1225        .chars()
1226        .flat_map(|ch| match ch {
1227            '\\' => "\\\\".chars().collect::<Vec<_>>(),
1228            '"' => "\\\"".chars().collect::<Vec<_>>(),
1229            '\n' => "\\n".chars().collect::<Vec<_>>(),
1230            other => vec![other],
1231        })
1232        .collect()
1233}
1234
1235fn prometheus_float(value: f64) -> String {
1236    if value.is_infinite() && value.is_sign_positive() {
1237        return "+Inf".to_string();
1238    }
1239    if value.fract() == 0.0 {
1240        format!("{value:.0}")
1241    } else {
1242        let rendered = format!("{value:.6}");
1243        rendered
1244            .trim_end_matches('0')
1245            .trim_end_matches('.')
1246            .to_string()
1247    }
1248}
1249
1250/// Provider payload schema metadata exposed by a connector.
1251#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1252pub struct ProviderPayloadSchema {
1253    pub harn_schema_name: String,
1254    #[serde(default)]
1255    pub json_schema: JsonValue,
1256}
1257
1258impl ProviderPayloadSchema {
1259    pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
1260        Self {
1261            harn_schema_name: harn_schema_name.into(),
1262            json_schema,
1263        }
1264    }
1265
1266    pub fn named(harn_schema_name: impl Into<String>) -> Self {
1267        Self::new(harn_schema_name, JsonValue::Null)
1268    }
1269}
1270
1271impl Default for ProviderPayloadSchema {
1272    fn default() -> Self {
1273        Self::named("raw")
1274    }
1275}
1276
1277/// High-level transport kind a connector supports.
1278#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1279#[serde(transparent)]
1280pub struct TriggerKind(String);
1281
1282impl TriggerKind {
1283    pub fn new(value: impl Into<String>) -> Self {
1284        Self(value.into())
1285    }
1286
1287    pub fn as_str(&self) -> &str {
1288        self.0.as_str()
1289    }
1290}
1291
1292impl From<&str> for TriggerKind {
1293    fn from(value: &str) -> Self {
1294        Self::new(value)
1295    }
1296}
1297
1298impl From<String> for TriggerKind {
1299    fn from(value: String) -> Self {
1300        Self::new(value)
1301    }
1302}
1303
1304/// Future trigger manifest binding routed to a connector activation.
1305#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1306pub struct TriggerBinding {
1307    pub provider: ProviderId,
1308    pub kind: TriggerKind,
1309    pub binding_id: String,
1310    #[serde(default)]
1311    pub dedupe_key: Option<String>,
1312    #[serde(default = "default_dedupe_retention_days")]
1313    pub dedupe_retention_days: u32,
1314    #[serde(default)]
1315    pub config: JsonValue,
1316}
1317
1318impl TriggerBinding {
1319    pub fn new(
1320        provider: ProviderId,
1321        kind: impl Into<TriggerKind>,
1322        binding_id: impl Into<String>,
1323    ) -> Self {
1324        Self {
1325            provider,
1326            kind: kind.into(),
1327            binding_id: binding_id.into(),
1328            dedupe_key: None,
1329            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1330            config: JsonValue::Null,
1331        }
1332    }
1333}
1334
1335fn default_dedupe_retention_days() -> u32 {
1336    crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
1337}
1338
1339/// Small in-memory trigger-binding registry used to fan bindings into connectors.
1340#[derive(Clone, Debug, Default)]
1341pub struct TriggerRegistry {
1342    bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
1343}
1344
1345impl TriggerRegistry {
1346    pub fn register(&mut self, binding: TriggerBinding) {
1347        self.bindings
1348            .entry(binding.provider.clone())
1349            .or_default()
1350            .push(binding);
1351    }
1352
1353    pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
1354        &self.bindings
1355    }
1356
1357    pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
1358        self.bindings
1359            .get(provider)
1360            .map(Vec::as_slice)
1361            .unwrap_or(&[])
1362    }
1363}
1364
1365/// Metadata returned from connector activation.
1366#[derive(Clone, Debug, PartialEq, Eq)]
1367pub struct ActivationHandle {
1368    pub provider: ProviderId,
1369    pub binding_count: usize,
1370}
1371
1372impl ActivationHandle {
1373    pub fn new(provider: ProviderId, binding_count: usize) -> Self {
1374        Self {
1375            provider,
1376            binding_count,
1377        }
1378    }
1379}
1380
1381/// Provider-native inbound request payload preserved as raw bytes.
1382#[derive(Clone, Debug, PartialEq)]
1383pub struct RawInbound {
1384    pub kind: String,
1385    pub headers: BTreeMap<String, String>,
1386    pub query: BTreeMap<String, String>,
1387    pub body: Vec<u8>,
1388    pub received_at: OffsetDateTime,
1389    pub occurred_at: Option<OffsetDateTime>,
1390    pub tenant_id: Option<TenantId>,
1391    pub metadata: JsonValue,
1392}
1393
1394impl RawInbound {
1395    pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
1396        Self {
1397            kind: kind.into(),
1398            headers,
1399            query: BTreeMap::new(),
1400            body,
1401            received_at: clock::now_utc(),
1402            occurred_at: None,
1403            tenant_id: None,
1404            metadata: JsonValue::Null,
1405        }
1406    }
1407
1408    pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
1409        Ok(serde_json::from_slice(&self.body)?)
1410    }
1411}
1412
1413/// Token-bucket configuration shared across connector clients.
1414#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1415pub struct RateLimitConfig {
1416    pub capacity: u32,
1417    pub refill_tokens: u32,
1418    pub refill_interval: StdDuration,
1419}
1420
1421impl Default for RateLimitConfig {
1422    fn default() -> Self {
1423        Self {
1424            capacity: 60,
1425            refill_tokens: 1,
1426            refill_interval: StdDuration::from_secs(1),
1427        }
1428    }
1429}
1430
1431#[derive(Clone, Debug)]
1432struct TokenBucket {
1433    tokens: f64,
1434    last_refill: ClockInstant,
1435}
1436
1437impl TokenBucket {
1438    fn full(config: RateLimitConfig) -> Self {
1439        Self {
1440            tokens: config.capacity as f64,
1441            last_refill: clock::instant_now(),
1442        }
1443    }
1444
1445    fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
1446        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1447        let rate = config.refill_tokens.max(1) as f64 / interval;
1448        let elapsed = now.duration_since(self.last_refill).as_secs_f64();
1449        self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
1450        self.last_refill = now;
1451    }
1452
1453    fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
1454        self.refill(config, now);
1455        if self.tokens >= 1.0 {
1456            self.tokens -= 1.0;
1457            true
1458        } else {
1459            false
1460        }
1461    }
1462
1463    fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
1464        if self.tokens >= 1.0 {
1465            return StdDuration::ZERO;
1466        }
1467        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1468        let rate = config.refill_tokens.max(1) as f64 / interval;
1469        let missing = (1.0 - self.tokens).max(0.0);
1470        StdDuration::from_secs_f64((missing / rate).max(0.001))
1471    }
1472}
1473
1474/// Shared per-provider, per-key token bucket factory for outbound connector clients.
1475#[derive(Debug)]
1476pub struct RateLimiterFactory {
1477    config: RateLimitConfig,
1478    buckets: Mutex<HashMap<(String, String), TokenBucket>>,
1479}
1480
1481impl RateLimiterFactory {
1482    pub fn new(config: RateLimitConfig) -> Self {
1483        Self {
1484            config,
1485            buckets: Mutex::new(HashMap::new()),
1486        }
1487    }
1488
1489    pub fn config(&self) -> RateLimitConfig {
1490        self.config
1491    }
1492
1493    pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
1494        ScopedRateLimiter {
1495            factory: self,
1496            provider: provider.clone(),
1497            key: key.into(),
1498        }
1499    }
1500
1501    pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
1502        let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1503        let bucket = buckets
1504            .entry((provider.as_str().to_string(), key.to_string()))
1505            .or_insert_with(|| TokenBucket::full(self.config));
1506        bucket.try_acquire(self.config, clock::instant_now())
1507    }
1508
1509    pub async fn acquire(&self, provider: &ProviderId, key: &str) {
1510        loop {
1511            let wait = {
1512                let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1513                let bucket = buckets
1514                    .entry((provider.as_str().to_string(), key.to_string()))
1515                    .or_insert_with(|| TokenBucket::full(self.config));
1516                if bucket.try_acquire(self.config, clock::instant_now()) {
1517                    return;
1518                }
1519                bucket.wait_duration(self.config)
1520            };
1521            tokio::time::sleep(wait).await;
1522        }
1523    }
1524}
1525
1526impl Default for RateLimiterFactory {
1527    fn default() -> Self {
1528        Self::new(RateLimitConfig::default())
1529    }
1530}
1531
1532/// Borrowed view onto a single provider/key rate-limit scope.
1533#[derive(Clone, Debug)]
1534pub struct ScopedRateLimiter<'a> {
1535    factory: &'a RateLimiterFactory,
1536    provider: ProviderId,
1537    key: String,
1538}
1539
1540impl<'a> ScopedRateLimiter<'a> {
1541    pub fn try_acquire(&self) -> bool {
1542        self.factory.try_acquire(&self.provider, &self.key)
1543    }
1544
1545    pub async fn acquire(&self) {
1546        self.factory.acquire(&self.provider, &self.key).await;
1547    }
1548}
1549
1550/// Runtime connector registry keyed by provider id.
1551pub struct ConnectorRegistry {
1552    connectors: BTreeMap<ProviderId, ConnectorHandle>,
1553}
1554
1555impl ConnectorRegistry {
1556    pub fn empty() -> Self {
1557        Self {
1558            connectors: BTreeMap::new(),
1559        }
1560    }
1561
1562    pub fn with_defaults() -> Self {
1563        let mut registry = Self::empty();
1564        for provider in registered_provider_metadata() {
1565            registry
1566                .register(default_connector_for_provider(&provider))
1567                .expect("default connector registration should not fail");
1568        }
1569        registry
1570    }
1571
1572    pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
1573        let provider = connector.provider_id().clone();
1574        if self.connectors.contains_key(&provider) {
1575            return Err(ConnectorError::DuplicateProvider(provider.0));
1576        }
1577        self.connectors
1578            .insert(provider, Arc::new(AsyncMutex::new(connector)));
1579        Ok(())
1580    }
1581
1582    pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
1583        self.connectors.get(id).cloned()
1584    }
1585
1586    pub fn remove(&mut self, id: &ProviderId) -> Option<ConnectorHandle> {
1587        self.connectors.remove(id)
1588    }
1589
1590    pub fn list(&self) -> Vec<ProviderId> {
1591        self.connectors.keys().cloned().collect()
1592    }
1593
1594    pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1595        for connector in self.connectors.values() {
1596            connector.lock().await.init(ctx.clone()).await?;
1597        }
1598        Ok(())
1599    }
1600
1601    pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
1602        let mut clients = BTreeMap::new();
1603        for (provider, connector) in &self.connectors {
1604            let client = connector.lock().await.client();
1605            clients.insert(provider.clone(), client);
1606        }
1607        clients
1608    }
1609
1610    pub async fn activate_all(
1611        &self,
1612        registry: &TriggerRegistry,
1613    ) -> Result<Vec<ActivationHandle>, ConnectorError> {
1614        let mut handles = Vec::new();
1615        for (provider, connector) in &self.connectors {
1616            let bindings = registry.bindings_for(provider);
1617            if bindings.is_empty() {
1618                continue;
1619            }
1620            let connector = connector.lock().await;
1621            handles.push(connector.activate(bindings).await?);
1622        }
1623        Ok(handles)
1624    }
1625}
1626
1627impl Default for ConnectorRegistry {
1628    fn default() -> Self {
1629        Self::with_defaults()
1630    }
1631}
1632
1633fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
1634    // The provider catalog on main registers `github` with
1635    // ProviderRuntimeMetadata::Builtin { connector: "webhook", ... } so that
1636    // before a native connector existed the catalog auto-wired a
1637    // GenericWebhookConnector. Now that #170 lands a first-class
1638    // GitHubConnector (inbound HMAC + GitHub App outbound), we short-circuit
1639    // provider_id "github" here and return the native connector instead of a
1640    // webhook-backed fallback. This keeps manifests that say
1641    // `provider = "github"` pointed at the new connector without requiring
1642    // users to switch to a distinct provider_id.
1643    if provider.provider == "github" {
1644        return Box::new(GitHubConnector::new());
1645    }
1646    if provider.provider == "linear" {
1647        return Box::new(LinearConnector::new());
1648    }
1649    if provider.provider == "slack" {
1650        return Box::new(SlackConnector::new());
1651    }
1652    if provider.provider == "notion" {
1653        return Box::new(NotionConnector::new());
1654    }
1655    if provider.provider == "a2a-push" {
1656        return Box::new(A2aPushConnector::new());
1657    }
1658    match &provider.runtime {
1659        ProviderRuntimeMetadata::Builtin {
1660            connector,
1661            default_signature_variant,
1662        } => match connector.as_str() {
1663            "cron" => Box::new(CronConnector::new()),
1664            "stream" => Box::new(StreamConnector::new(
1665                ProviderId::from(provider.provider.clone()),
1666                provider.schema_name.clone(),
1667            )),
1668            "webhook" => {
1669                let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
1670                    .expect("catalog webhook signature variant must be valid");
1671                Box::new(GenericWebhookConnector::with_profile(
1672                    WebhookProviderProfile::new(
1673                        ProviderId::from(provider.provider.clone()),
1674                        provider.schema_name.clone(),
1675                        variant,
1676                    ),
1677                ))
1678            }
1679            _ => Box::new(PlaceholderConnector::from_metadata(provider)),
1680        },
1681        ProviderRuntimeMetadata::Placeholder => {
1682            Box::new(PlaceholderConnector::from_metadata(provider))
1683        }
1684    }
1685}
1686
1687struct PlaceholderConnector {
1688    provider_id: ProviderId,
1689    kinds: Vec<TriggerKind>,
1690    schema_name: String,
1691}
1692
1693impl PlaceholderConnector {
1694    fn from_metadata(metadata: &ProviderMetadata) -> Self {
1695        Self {
1696            provider_id: ProviderId::from(metadata.provider.clone()),
1697            kinds: metadata
1698                .kinds
1699                .iter()
1700                .cloned()
1701                .map(TriggerKind::from)
1702                .collect(),
1703            schema_name: metadata.schema_name.clone(),
1704        }
1705    }
1706}
1707
1708struct PlaceholderClient;
1709
1710#[async_trait]
1711impl ConnectorClient for PlaceholderClient {
1712    async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1713        Err(ClientError::Other(format!(
1714            "connector client method '{method}' is not implemented for this provider"
1715        )))
1716    }
1717}
1718
1719#[async_trait]
1720impl Connector for PlaceholderConnector {
1721    fn provider_id(&self) -> &ProviderId {
1722        &self.provider_id
1723    }
1724
1725    fn kinds(&self) -> &[TriggerKind] {
1726        &self.kinds
1727    }
1728
1729    async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1730        Ok(())
1731    }
1732
1733    async fn activate(
1734        &self,
1735        bindings: &[TriggerBinding],
1736    ) -> Result<ActivationHandle, ConnectorError> {
1737        Ok(ActivationHandle::new(
1738            self.provider_id.clone(),
1739            bindings.len(),
1740        ))
1741    }
1742
1743    async fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
1744        Err(ConnectorError::Unsupported(format!(
1745            "provider '{}' is cataloged but does not have a concrete inbound connector yet",
1746            self.provider_id.as_str()
1747        )))
1748    }
1749
1750    fn payload_schema(&self) -> ProviderPayloadSchema {
1751        ProviderPayloadSchema::named(self.schema_name.clone())
1752    }
1753
1754    fn client(&self) -> Arc<dyn ConnectorClient> {
1755        Arc::new(PlaceholderClient)
1756    }
1757}
1758
1759pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
1760    ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
1761        *slot.borrow_mut() = clients
1762            .into_iter()
1763            .map(|(provider, client)| (provider.as_str().to_string(), client))
1764            .collect();
1765    });
1766}
1767
1768pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
1769    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
1770}
1771
1772pub fn clear_active_connector_clients() {
1773    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
1774}
1775
1776#[cfg(test)]
1777mod tests {
1778    use super::*;
1779
1780    use std::sync::atomic::{AtomicUsize, Ordering};
1781
1782    use async_trait::async_trait;
1783    use serde_json::json;
1784
1785    struct NoopClient;
1786
1787    #[async_trait]
1788    impl ConnectorClient for NoopClient {
1789        async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1790            Ok(json!({ "method": method }))
1791        }
1792    }
1793
1794    struct FakeConnector {
1795        provider_id: ProviderId,
1796        kinds: Vec<TriggerKind>,
1797        activate_calls: Arc<AtomicUsize>,
1798    }
1799
1800    impl FakeConnector {
1801        fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
1802            Self {
1803                provider_id: ProviderId::from(provider_id),
1804                kinds: vec![TriggerKind::from("webhook")],
1805                activate_calls,
1806            }
1807        }
1808    }
1809
1810    #[async_trait]
1811    impl Connector for FakeConnector {
1812        fn provider_id(&self) -> &ProviderId {
1813            &self.provider_id
1814        }
1815
1816        fn kinds(&self) -> &[TriggerKind] {
1817            &self.kinds
1818        }
1819
1820        async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1821            Ok(())
1822        }
1823
1824        async fn activate(
1825            &self,
1826            bindings: &[TriggerBinding],
1827        ) -> Result<ActivationHandle, ConnectorError> {
1828            self.activate_calls.fetch_add(1, Ordering::SeqCst);
1829            Ok(ActivationHandle::new(
1830                self.provider_id.clone(),
1831                bindings.len(),
1832            ))
1833        }
1834
1835        async fn normalize_inbound(
1836            &self,
1837            _raw: RawInbound,
1838        ) -> Result<TriggerEvent, ConnectorError> {
1839            Err(ConnectorError::Unsupported(
1840                "not needed for registry tests".to_string(),
1841            ))
1842        }
1843
1844        fn payload_schema(&self) -> ProviderPayloadSchema {
1845            ProviderPayloadSchema::named("FakePayload")
1846        }
1847
1848        fn client(&self) -> Arc<dyn ConnectorClient> {
1849            Arc::new(NoopClient)
1850        }
1851    }
1852
1853    #[tokio::test]
1854    async fn connector_registry_rejects_duplicate_providers() {
1855        let activate_calls = Arc::new(AtomicUsize::new(0));
1856        let mut registry = ConnectorRegistry::empty();
1857        registry
1858            .register(Box::new(FakeConnector::new(
1859                "github",
1860                activate_calls.clone(),
1861            )))
1862            .unwrap();
1863
1864        let error = registry
1865            .register(Box::new(FakeConnector::new("github", activate_calls)))
1866            .unwrap_err();
1867        assert!(matches!(
1868            error,
1869            ConnectorError::DuplicateProvider(provider) if provider == "github"
1870        ));
1871    }
1872
1873    #[tokio::test]
1874    async fn connector_registry_activates_only_bound_connectors() {
1875        let github_calls = Arc::new(AtomicUsize::new(0));
1876        let slack_calls = Arc::new(AtomicUsize::new(0));
1877        let mut registry = ConnectorRegistry::empty();
1878        registry
1879            .register(Box::new(FakeConnector::new("github", github_calls.clone())))
1880            .unwrap();
1881        registry
1882            .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
1883            .unwrap();
1884
1885        let mut trigger_registry = TriggerRegistry::default();
1886        trigger_registry.register(TriggerBinding::new(
1887            ProviderId::from("github"),
1888            "webhook",
1889            "github.push",
1890        ));
1891        trigger_registry.register(TriggerBinding::new(
1892            ProviderId::from("github"),
1893            "webhook",
1894            "github.installation",
1895        ));
1896
1897        let handles = registry.activate_all(&trigger_registry).await.unwrap();
1898        assert_eq!(handles.len(), 1);
1899        assert_eq!(handles[0].provider.as_str(), "github");
1900        assert_eq!(handles[0].binding_count, 2);
1901        assert_eq!(github_calls.load(Ordering::SeqCst), 1);
1902        assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
1903    }
1904
1905    #[test]
1906    fn rate_limiter_scopes_tokens_by_provider_and_key() {
1907        let factory = RateLimiterFactory::new(RateLimitConfig {
1908            capacity: 1,
1909            refill_tokens: 1,
1910            refill_interval: StdDuration::from_secs(60),
1911        });
1912
1913        assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
1914        assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
1915        assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
1916        assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
1917    }
1918
1919    #[test]
1920    fn raw_inbound_json_body_preserves_raw_bytes() {
1921        let raw = RawInbound::new(
1922            "push",
1923            BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
1924            br#"{"ok":true}"#.to_vec(),
1925        );
1926
1927        assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
1928    }
1929
1930    #[test]
1931    fn connector_registry_lists_catalog_providers() {
1932        let registry = ConnectorRegistry::default();
1933        let providers = registry.list();
1934        assert!(providers.contains(&ProviderId::from("cron")));
1935        assert!(providers.contains(&ProviderId::from("github")));
1936        assert!(providers.contains(&ProviderId::from("webhook")));
1937    }
1938
1939    #[test]
1940    fn metrics_registry_exports_orchestrator_metric_families() {
1941        let metrics = MetricsRegistry::default();
1942        metrics.record_http_request(
1943            "/triggers/github",
1944            "POST",
1945            200,
1946            StdDuration::from_millis(25),
1947            512,
1948        );
1949        metrics.record_trigger_received("github-new-issue", "github");
1950        metrics.record_trigger_deduped("github-new-issue", "inbox_duplicate");
1951        metrics.record_trigger_predicate_evaluation("github-new-issue", true, 0.002);
1952        metrics.record_trigger_dispatched("github-new-issue", "local", "succeeded");
1953        metrics.record_trigger_retry("github-new-issue", 2);
1954        metrics.record_trigger_dlq("github-new-issue", "retry_exhausted");
1955        metrics.set_trigger_inflight("github-new-issue", 0);
1956        metrics.record_event_log_append(
1957            "orchestrator.triggers.pending",
1958            StdDuration::from_millis(1),
1959            2048,
1960        );
1961        metrics.set_event_log_consumer_lag("orchestrator.triggers.pending", "orchestrator-pump", 0);
1962        metrics.set_trigger_budget_cost_today("github-new-issue", 0.002);
1963        metrics.record_trigger_budget_exhausted("github-new-issue", "daily_budget_exceeded");
1964        metrics.record_a2a_hop("agent.example", "succeeded", StdDuration::from_millis(10));
1965        metrics.set_worker_queue_depth("triage", 1);
1966        metrics.record_worker_queue_claim_age("triage", 3.0);
1967        metrics.set_orchestrator_pump_backlog("trigger.inbox.envelopes", 2);
1968        metrics.set_orchestrator_pump_outstanding("trigger.inbox.envelopes", 1);
1969        metrics.record_orchestrator_pump_admission_delay(
1970            "trigger.inbox.envelopes",
1971            StdDuration::from_millis(50),
1972        );
1973        metrics.record_trigger_accepted_to_normalized(
1974            "github-new-issue",
1975            "github-new-issue@v7",
1976            "github",
1977            Some("tenant-a"),
1978            "normalized",
1979            StdDuration::from_millis(25),
1980        );
1981        metrics.record_trigger_accepted_to_queue_append(
1982            "github-new-issue",
1983            "github-new-issue@v7",
1984            "github",
1985            Some("tenant-a"),
1986            "queued",
1987            StdDuration::from_millis(40),
1988        );
1989        metrics.record_trigger_queue_age_at_dispatch_admission(
1990            "github-new-issue",
1991            "github-new-issue@v7",
1992            "github",
1993            Some("tenant-a"),
1994            "admitted",
1995            StdDuration::from_millis(75),
1996        );
1997        metrics.record_trigger_queue_age_at_dispatch_start(
1998            "github-new-issue",
1999            "github-new-issue@v7",
2000            "github",
2001            Some("tenant-a"),
2002            "started",
2003            StdDuration::from_millis(125),
2004        );
2005        metrics.record_trigger_dispatch_runtime(
2006            "github-new-issue",
2007            "github-new-issue@v7",
2008            "github",
2009            Some("tenant-a"),
2010            "succeeded",
2011            StdDuration::from_millis(250),
2012        );
2013        metrics.record_trigger_retry_delay(
2014            "github-new-issue",
2015            "github-new-issue@v7",
2016            "github",
2017            Some("tenant-a"),
2018            "scheduled",
2019            StdDuration::from_secs(2),
2020        );
2021        metrics.record_trigger_accepted_to_dlq(
2022            "github-new-issue",
2023            "github-new-issue@v7",
2024            "github",
2025            Some("tenant-a"),
2026            "retry_exhausted",
2027            StdDuration::from_secs(45),
2028        );
2029        metrics.record_backpressure_event("ingest", "reject");
2030        metrics.note_trigger_pending_event(
2031            "evt-1",
2032            "github-new-issue",
2033            "github-new-issue@v7",
2034            "github",
2035            Some("tenant-a"),
2036            1_000,
2037            4_000,
2038        );
2039        metrics.record_llm_call("mock", "mock", "succeeded", 0.01);
2040        metrics.record_llm_cache_hit("mock");
2041
2042        let rendered = metrics.render_prometheus();
2043        for needle in [
2044            "harn_http_requests_total{endpoint=\"/triggers/github\",method=\"POST\",status=\"200\"} 1",
2045            "harn_http_request_duration_seconds_bucket{endpoint=\"/triggers/github\",le=\"0.05\"} 1",
2046            "harn_http_body_size_bytes_bucket{endpoint=\"/triggers/github\",le=\"512\"} 1",
2047            "harn_trigger_received_total{provider=\"github\",trigger_id=\"github-new-issue\"} 1",
2048            "harn_trigger_deduped_total{reason=\"inbox_duplicate\",trigger_id=\"github-new-issue\"} 1",
2049            "harn_trigger_predicate_evaluations_total{result=\"true\",trigger_id=\"github-new-issue\"} 1",
2050            "harn_trigger_predicate_cost_usd_bucket{le=\"0.01\",trigger_id=\"github-new-issue\"} 1",
2051            "harn_trigger_dispatched_total{handler_kind=\"local\",outcome=\"succeeded\",trigger_id=\"github-new-issue\"} 1",
2052            "harn_trigger_retries_total{attempt=\"2\",trigger_id=\"github-new-issue\"} 1",
2053            "harn_trigger_dlq_total{reason=\"retry_exhausted\",trigger_id=\"github-new-issue\"} 1",
2054            "harn_trigger_inflight{trigger_id=\"github-new-issue\"} 0",
2055            "harn_event_log_append_duration_seconds_bucket{le=\"0.005\",topic=\"orchestrator.triggers.pending\"} 1",
2056            "harn_event_log_topic_size_bytes{topic=\"orchestrator.triggers.pending\"} 2048",
2057            "harn_event_log_consumer_lag{consumer=\"orchestrator-pump\",topic=\"orchestrator.triggers.pending\"} 0",
2058            "harn_trigger_budget_cost_today_usd{trigger_id=\"github-new-issue\"} 0.002",
2059            "harn_trigger_budget_exhausted_total{strategy=\"daily_budget_exceeded\",trigger_id=\"github-new-issue\"} 1",
2060            "harn_backpressure_events_total{action=\"reject\",dimension=\"ingest\"} 1",
2061            "harn_a2a_hops_total{outcome=\"succeeded\",target=\"agent.example\"} 1",
2062            "harn_a2a_hop_duration_seconds_bucket{le=\"0.01\",target=\"agent.example\"} 1",
2063            "harn_worker_queue_depth{queue=\"triage\"} 1",
2064            "harn_worker_queue_claim_age_seconds_bucket{le=\"5\",queue=\"triage\"} 1",
2065            "harn_orchestrator_pump_backlog{topic=\"trigger.inbox.envelopes\"} 2",
2066            "harn_orchestrator_pump_outstanding{topic=\"trigger.inbox.envelopes\"} 1",
2067            "harn_orchestrator_pump_admission_delay_seconds_bucket{le=\"0.05\",topic=\"trigger.inbox.envelopes\"} 1",
2068            "harn_trigger_webhook_accepted_to_normalized_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.025\",provider=\"github\",status=\"normalized\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2069            "harn_trigger_webhook_accepted_to_queue_append_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.05\",provider=\"github\",status=\"queued\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2070            "harn_trigger_queue_age_at_dispatch_admission_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.1\",provider=\"github\",status=\"admitted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2071            "harn_trigger_queue_age_at_dispatch_start_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"started\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2072            "harn_trigger_dispatch_runtime_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"succeeded\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2073            "harn_trigger_retry_delay_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"2.5\",provider=\"github\",status=\"scheduled\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2074            "harn_trigger_accepted_to_dlq_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"60\",provider=\"github\",status=\"retry_exhausted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2075            "harn_trigger_oldest_pending_age_seconds{binding_key=\"github-new-issue@v7\",provider=\"github\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 3",
2076            "harn_llm_calls_total{model=\"mock\",outcome=\"succeeded\",provider=\"mock\"} 1",
2077            "harn_llm_cost_usd_total{model=\"mock\",provider=\"mock\"} 0.01",
2078            "harn_llm_cache_hits_total{provider=\"mock\"} 1",
2079        ] {
2080            assert!(rendered.contains(needle), "missing {needle}\n{rendered}");
2081        }
2082    }
2083}