Skip to main content

harn_vm/connectors/
mod.rs

1//! Connector traits and shared helpers for inbound event-source providers.
2//!
3//! This lands in `harn-vm` for now because the current dependency surface
4//! (`EventLog`, `SecretProvider`, `TriggerEvent`) already lives here. If the
5//! connector ecosystem grows enough to justify extraction, the module can be
6//! split into a dedicated crate later without changing the high-level contract.
7
8use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex, OnceLock};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25    registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26    ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod a2a_push;
30pub mod cron;
31pub mod effect_policy;
32pub mod harn_module;
33pub mod hmac;
34pub mod shared;
35pub mod stream;
36#[cfg(test)]
37pub(crate) mod test_util;
38pub mod testkit;
39pub mod webhook;
40
41pub use a2a_push::A2aPushConnector;
42pub use cron::{CatchupMode, CronConnector};
43pub use effect_policy::{
44    connector_export_denied_builtin_reason, connector_export_effect_class,
45    default_connector_export_policy, ConnectorExportEffectClass, HarnConnectorEffectPolicies,
46};
47pub use harn_module::{
48    load_contract as load_harn_connector_contract, HarnConnector, HarnConnectorContract,
49};
50pub use hmac::{
51    verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
52    DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
53    DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
54    DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
55    DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
56    DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
57    SIGNATURE_VERIFY_AUDIT_TOPIC,
58};
59pub use shared::{
60    paginate_cursor, resolve_jwks, verify_hmac_signature, verify_jwt_claims, verify_jwt_json,
61    ConnectorBase, CursorPage, HmacSignatureAlgorithm, JwtKeySource, JwtVerificationOptions,
62};
63pub use stream::StreamConnector;
64use webhook::WebhookProviderProfile;
65pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
66
67const OUTBOUND_CONNECTOR_HTTP_TIMEOUT: StdDuration = StdDuration::from_secs(30);
68
69pub(crate) fn outbound_http_client(user_agent: &'static str) -> reqwest::Client {
70    reqwest::Client::builder()
71        .user_agent(user_agent)
72        .timeout(OUTBOUND_CONNECTOR_HTTP_TIMEOUT)
73        .redirect(crate::egress::redirect_policy("connector_redirect", 10))
74        .build()
75        .expect("connector HTTP client configuration should be valid")
76}
77
78/// Shared owned handle to a connector instance registered with the runtime.
79pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
80
81thread_local! {
82    static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
83        RefCell::new(BTreeMap::new());
84}
85
86/// Provider implementation contract for inbound connectors.
87#[async_trait]
88pub trait Connector: Send + Sync {
89    /// Stable provider id such as `github`, `slack`, or `webhook`.
90    fn provider_id(&self) -> &ProviderId;
91
92    /// Trigger kinds this connector supports (`webhook`, `poll`, `stream`, ...).
93    fn kinds(&self) -> &[TriggerKind];
94
95    /// Called once per connector instance at orchestrator startup.
96    async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
97
98    /// Activate the bindings relevant to this connector instance.
99    async fn activate(
100        &self,
101        bindings: &[TriggerBinding],
102    ) -> Result<ActivationHandle, ConnectorError>;
103
104    /// Stop connector-owned background work and flush any connector-local state.
105    async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
106        Ok(())
107    }
108
109    /// Verify + normalize a provider-native inbound request into `TriggerEvent`.
110    async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
111
112    /// Verify + normalize a provider-native inbound request into the richer
113    /// connector result contract used by ack-first webhook adapters.
114    async fn normalize_inbound_result(
115        &self,
116        raw: RawInbound,
117    ) -> Result<ConnectorNormalizeResult, ConnectorError> {
118        self.normalize_inbound(raw)
119            .await
120            .map(ConnectorNormalizeResult::event)
121    }
122
123    /// Payload schema surfaced to future trigger-type narrowing.
124    fn payload_schema(&self) -> ProviderPayloadSchema;
125
126    /// Outbound API wrapper exposed to handlers.
127    fn client(&self) -> Arc<dyn ConnectorClient>;
128}
129
130/// Provider-supplied HTTP response returned before or instead of trigger dispatch.
131#[derive(Clone, Debug, PartialEq, Eq)]
132pub struct ConnectorHttpResponse {
133    pub status: u16,
134    pub headers: BTreeMap<String, String>,
135    pub body: JsonValue,
136}
137
138impl ConnectorHttpResponse {
139    pub fn new(status: u16, headers: BTreeMap<String, String>, body: JsonValue) -> Self {
140        Self {
141            status,
142            headers,
143            body,
144        }
145    }
146}
147
148/// Normalized inbound result accepted by the runtime connector adapter.
149#[derive(Clone, Debug, PartialEq)]
150pub enum ConnectorNormalizeResult {
151    Event(Box<TriggerEvent>),
152    Batch(Vec<TriggerEvent>),
153    ImmediateResponse {
154        response: ConnectorHttpResponse,
155        events: Vec<TriggerEvent>,
156    },
157    Reject(ConnectorHttpResponse),
158}
159
160impl ConnectorNormalizeResult {
161    pub fn event(event: TriggerEvent) -> Self {
162        Self::Event(Box::new(event))
163    }
164
165    pub fn into_events(self) -> Vec<TriggerEvent> {
166        match self {
167            Self::Event(event) => vec![*event],
168            Self::Batch(events) | Self::ImmediateResponse { events, .. } => events,
169            Self::Reject(_) => Vec::new(),
170        }
171    }
172}
173
174#[derive(Clone, Debug, PartialEq)]
175pub enum PostNormalizeOutcome {
176    Ready(Box<TriggerEvent>),
177    DuplicateDropped,
178}
179
180pub async fn postprocess_normalized_event(
181    inbox: &InboxIndex,
182    binding_id: &str,
183    dedupe_enabled: bool,
184    dedupe_ttl: StdDuration,
185    mut event: TriggerEvent,
186) -> Result<PostNormalizeOutcome, ConnectorError> {
187    if dedupe_enabled && !event.dedupe_claimed() {
188        if !inbox
189            .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
190            .await?
191        {
192            return Ok(PostNormalizeOutcome::DuplicateDropped);
193        }
194        event.mark_dedupe_claimed();
195    }
196
197    Ok(PostNormalizeOutcome::Ready(Box::new(event)))
198}
199
200/// Outbound provider client interface used by connector-backed stdlib modules.
201#[async_trait]
202pub trait ConnectorClient: Send + Sync {
203    async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
204}
205
206/// Minimal outbound client errors shared by connector implementations.
207#[derive(Clone, Debug, PartialEq, Eq)]
208pub enum ClientError {
209    MethodNotFound(String),
210    InvalidArgs(String),
211    RateLimited(String),
212    Transport(String),
213    EgressBlocked(crate::egress::EgressBlocked),
214    Other(String),
215}
216
217impl fmt::Display for ClientError {
218    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219        match self {
220            Self::MethodNotFound(message)
221            | Self::InvalidArgs(message)
222            | Self::RateLimited(message)
223            | Self::Transport(message)
224            | Self::Other(message) => message.fmt(f),
225            Self::EgressBlocked(blocked) => blocked.fmt(f),
226        }
227    }
228}
229
230impl std::error::Error for ClientError {}
231
232/// Shared connector-layer errors.
233#[derive(Debug)]
234pub enum ConnectorError {
235    DuplicateProvider(String),
236    DuplicateDelivery(String),
237    UnknownProvider(String),
238    MissingHeader(String),
239    InvalidHeader {
240        name: String,
241        detail: String,
242    },
243    InvalidSignature(String),
244    TimestampOutOfWindow {
245        timestamp: OffsetDateTime,
246        now: OffsetDateTime,
247        window: time::Duration,
248    },
249    Json(String),
250    Secret(String),
251    EventLog(String),
252    HarnRuntime(String),
253    Client(ClientError),
254    Unsupported(String),
255    Activation(String),
256}
257
258impl ConnectorError {
259    pub fn invalid_signature(message: impl Into<String>) -> Self {
260        Self::InvalidSignature(message.into())
261    }
262}
263
264impl fmt::Display for ConnectorError {
265    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
266        match self {
267            Self::DuplicateProvider(provider) => {
268                write!(f, "connector provider `{provider}` is already registered")
269            }
270            Self::DuplicateDelivery(message) => message.fmt(f),
271            Self::UnknownProvider(provider) => {
272                write!(f, "connector provider `{provider}` is not registered")
273            }
274            Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
275            Self::InvalidHeader { name, detail } => {
276                write!(f, "invalid header `{name}`: {detail}")
277            }
278            Self::InvalidSignature(message)
279            | Self::Json(message)
280            | Self::Secret(message)
281            | Self::EventLog(message)
282            | Self::HarnRuntime(message)
283            | Self::Unsupported(message)
284            | Self::Activation(message) => message.fmt(f),
285            Self::TimestampOutOfWindow {
286                timestamp,
287                now,
288                window,
289            } => write!(
290                f,
291                "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
292            ),
293            Self::Client(error) => error.fmt(f),
294        }
295    }
296}
297
298impl std::error::Error for ConnectorError {}
299
300impl From<crate::event_log::LogError> for ConnectorError {
301    fn from(value: crate::event_log::LogError) -> Self {
302        Self::EventLog(value.to_string())
303    }
304}
305
306impl From<crate::secrets::SecretError> for ConnectorError {
307    fn from(value: crate::secrets::SecretError) -> Self {
308        Self::Secret(value.to_string())
309    }
310}
311
312impl From<serde_json::Error> for ConnectorError {
313    fn from(value: serde_json::Error) -> Self {
314        Self::Json(value.to_string())
315    }
316}
317
318impl From<ClientError> for ConnectorError {
319    fn from(value: ClientError) -> Self {
320        Self::Client(value)
321    }
322}
323
324/// Startup context shared with connector instances.
325#[derive(Clone)]
326pub struct ConnectorCtx {
327    pub event_log: Arc<AnyEventLog>,
328    pub secrets: Arc<dyn SecretProvider>,
329    pub inbox: Arc<InboxIndex>,
330    pub metrics: Arc<MetricsRegistry>,
331    pub rate_limiter: Arc<RateLimiterFactory>,
332}
333
334/// Snapshot of connector-local metrics surfaced for tests and diagnostics.
335#[derive(Clone, Debug, Default, PartialEq, Eq)]
336pub struct ConnectorMetricsSnapshot {
337    pub inbox_claims_written: u64,
338    pub inbox_duplicates_rejected: u64,
339    pub inbox_fast_path_hits: u64,
340    pub inbox_durable_hits: u64,
341    pub inbox_expired_entries: u64,
342    pub inbox_active_entries: u64,
343    pub linear_timestamp_rejections_total: u64,
344    pub dispatch_succeeded_total: u64,
345    pub dispatch_failed_total: u64,
346    pub retry_scheduled_total: u64,
347    pub slack_delivery_success_total: u64,
348    pub slack_delivery_failure_total: u64,
349}
350
351type MetricLabels = BTreeMap<String, String>;
352
353#[derive(Clone, Debug, Default, PartialEq)]
354struct HistogramMetric {
355    buckets: BTreeMap<String, u64>,
356    count: u64,
357    sum: f64,
358}
359
360static ACTIVE_METRICS_REGISTRY: OnceLock<Mutex<Option<Arc<MetricsRegistry>>>> = OnceLock::new();
361
362pub fn install_active_metrics_registry(metrics: Arc<MetricsRegistry>) {
363    let slot = ACTIVE_METRICS_REGISTRY.get_or_init(|| Mutex::new(None));
364    *slot.lock().expect("active metrics registry poisoned") = Some(metrics);
365}
366
367pub fn clear_active_metrics_registry() {
368    if let Some(slot) = ACTIVE_METRICS_REGISTRY.get() {
369        *slot.lock().expect("active metrics registry poisoned") = None;
370    }
371}
372
373pub fn active_metrics_registry() -> Option<Arc<MetricsRegistry>> {
374    ACTIVE_METRICS_REGISTRY.get().and_then(|slot| {
375        slot.lock()
376            .expect("active metrics registry poisoned")
377            .clone()
378    })
379}
380
381/// Shared metrics surface for connector-local counters and timings.
382#[derive(Debug, Default)]
383pub struct MetricsRegistry {
384    inbox_claims_written: AtomicU64,
385    inbox_duplicates_rejected: AtomicU64,
386    inbox_fast_path_hits: AtomicU64,
387    inbox_durable_hits: AtomicU64,
388    inbox_expired_entries: AtomicU64,
389    inbox_active_entries: AtomicU64,
390    linear_timestamp_rejections_total: AtomicU64,
391    dispatch_succeeded_total: AtomicU64,
392    dispatch_failed_total: AtomicU64,
393    retry_scheduled_total: AtomicU64,
394    slack_delivery_success_total: AtomicU64,
395    slack_delivery_failure_total: AtomicU64,
396    custom_counters: Mutex<BTreeMap<String, u64>>,
397    counters: Mutex<BTreeMap<(String, MetricLabels), f64>>,
398    gauges: Mutex<BTreeMap<(String, MetricLabels), f64>>,
399    histograms: Mutex<BTreeMap<(String, MetricLabels), HistogramMetric>>,
400    pending_trigger_events: Mutex<BTreeMap<MetricLabels, BTreeMap<String, i64>>>,
401}
402
403impl MetricsRegistry {
404    const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0];
405    const TRIGGER_LATENCY_BUCKETS: [f64; 15] = [
406        0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0,
407    ];
408    const SIZE_BUCKETS: [f64; 9] = [
409        128.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0, 10485760.0,
410    ];
411
412    pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
413        ConnectorMetricsSnapshot {
414            inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
415            inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
416            inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
417            inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
418            inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
419            inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
420            linear_timestamp_rejections_total: self
421                .linear_timestamp_rejections_total
422                .load(Ordering::Relaxed),
423            dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
424            dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
425            retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
426            slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
427            slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
428        }
429    }
430
431    pub(crate) fn record_inbox_claim(&self) {
432        self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
433    }
434
435    pub(crate) fn record_inbox_duplicate_fast_path(&self) {
436        self.inbox_duplicates_rejected
437            .fetch_add(1, Ordering::Relaxed);
438        self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
439    }
440
441    pub(crate) fn record_inbox_duplicate_durable(&self) {
442        self.inbox_duplicates_rejected
443            .fetch_add(1, Ordering::Relaxed);
444        self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
445    }
446
447    pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
448        if count > 0 {
449            self.inbox_expired_entries
450                .fetch_add(count, Ordering::Relaxed);
451        }
452    }
453
454    pub(crate) fn set_inbox_active_entries(&self, count: usize) {
455        self.inbox_active_entries
456            .store(count as u64, Ordering::Relaxed);
457    }
458
459    pub fn record_linear_timestamp_rejection(&self) {
460        self.linear_timestamp_rejections_total
461            .fetch_add(1, Ordering::Relaxed);
462    }
463
464    pub fn record_dispatch_succeeded(&self) {
465        self.dispatch_succeeded_total
466            .fetch_add(1, Ordering::Relaxed);
467    }
468
469    pub fn record_dispatch_failed(&self) {
470        self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
471    }
472
473    pub fn record_retry_scheduled(&self) {
474        self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
475    }
476
477    pub fn record_slack_delivery_success(&self) {
478        self.slack_delivery_success_total
479            .fetch_add(1, Ordering::Relaxed);
480    }
481
482    pub fn record_slack_delivery_failure(&self) {
483        self.slack_delivery_failure_total
484            .fetch_add(1, Ordering::Relaxed);
485    }
486
487    pub fn record_custom_counter(&self, name: &str, amount: u64) {
488        if amount == 0 {
489            return;
490        }
491        let mut counters = self
492            .custom_counters
493            .lock()
494            .expect("custom counters poisoned");
495        *counters.entry(name.to_string()).or_default() += amount;
496    }
497
498    pub fn record_http_request(
499        &self,
500        endpoint: &str,
501        method: &str,
502        status: u16,
503        duration: StdDuration,
504        body_size_bytes: usize,
505    ) {
506        self.increment_counter(
507            "harn_http_requests_total",
508            labels([
509                ("endpoint", endpoint),
510                ("method", method),
511                ("status", &status.to_string()),
512            ]),
513            1,
514        );
515        self.observe_histogram(
516            "harn_http_request_duration_seconds",
517            labels([("endpoint", endpoint)]),
518            duration.as_secs_f64(),
519            &Self::DURATION_BUCKETS,
520        );
521        self.observe_histogram(
522            "harn_http_body_size_bytes",
523            labels([("endpoint", endpoint)]),
524            body_size_bytes as f64,
525            &Self::SIZE_BUCKETS,
526        );
527    }
528
529    pub fn record_trigger_received(&self, trigger_id: &str, provider: &str) {
530        self.increment_counter(
531            "harn_trigger_received_total",
532            labels([("trigger_id", trigger_id), ("provider", provider)]),
533            1,
534        );
535    }
536
537    pub fn record_trigger_deduped(&self, trigger_id: &str, reason: &str) {
538        self.increment_counter(
539            "harn_trigger_deduped_total",
540            labels([("trigger_id", trigger_id), ("reason", reason)]),
541            1,
542        );
543    }
544
545    pub fn record_trigger_predicate_evaluation(
546        &self,
547        trigger_id: &str,
548        result: bool,
549        cost_usd: f64,
550    ) {
551        self.increment_counter(
552            "harn_trigger_predicate_evaluations_total",
553            labels([
554                ("trigger_id", trigger_id),
555                ("result", if result { "true" } else { "false" }),
556            ]),
557            1,
558        );
559        self.observe_histogram(
560            "harn_trigger_predicate_cost_usd",
561            labels([("trigger_id", trigger_id)]),
562            cost_usd.max(0.0),
563            &[0.0, 0.001, 0.01, 0.05, 0.1, 1.0],
564        );
565    }
566
567    pub fn record_trigger_dispatched(&self, trigger_id: &str, handler_kind: &str, outcome: &str) {
568        self.increment_counter(
569            "harn_trigger_dispatched_total",
570            labels([
571                ("trigger_id", trigger_id),
572                ("handler_kind", handler_kind),
573                ("outcome", outcome),
574            ]),
575            1,
576        );
577    }
578
579    pub fn record_trigger_retry(&self, trigger_id: &str, attempt: u32) {
580        self.increment_counter(
581            "harn_trigger_retries_total",
582            labels([
583                ("trigger_id", trigger_id),
584                ("attempt", &attempt.to_string()),
585            ]),
586            1,
587        );
588    }
589
590    pub fn record_trigger_dlq(&self, trigger_id: &str, reason: &str) {
591        self.increment_counter(
592            "harn_trigger_dlq_total",
593            labels([("trigger_id", trigger_id), ("reason", reason)]),
594            1,
595        );
596    }
597
598    pub fn record_trigger_accepted_to_normalized(
599        &self,
600        trigger_id: &str,
601        binding_key: &str,
602        provider: &str,
603        tenant_id: Option<&str>,
604        status: &str,
605        duration: StdDuration,
606    ) {
607        self.observe_histogram(
608            "harn_trigger_webhook_accepted_to_normalized_seconds",
609            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
610            duration.as_secs_f64(),
611            &Self::TRIGGER_LATENCY_BUCKETS,
612        );
613    }
614
615    pub fn record_trigger_accepted_to_queue_append(
616        &self,
617        trigger_id: &str,
618        binding_key: &str,
619        provider: &str,
620        tenant_id: Option<&str>,
621        status: &str,
622        duration: StdDuration,
623    ) {
624        self.observe_histogram(
625            "harn_trigger_webhook_accepted_to_queue_append_seconds",
626            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
627            duration.as_secs_f64(),
628            &Self::TRIGGER_LATENCY_BUCKETS,
629        );
630    }
631
632    pub fn record_trigger_queue_age_at_dispatch_admission(
633        &self,
634        trigger_id: &str,
635        binding_key: &str,
636        provider: &str,
637        tenant_id: Option<&str>,
638        status: &str,
639        age: StdDuration,
640    ) {
641        self.observe_histogram(
642            "harn_trigger_queue_age_at_dispatch_admission_seconds",
643            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
644            age.as_secs_f64(),
645            &Self::TRIGGER_LATENCY_BUCKETS,
646        );
647    }
648
649    pub fn record_trigger_queue_age_at_dispatch_start(
650        &self,
651        trigger_id: &str,
652        binding_key: &str,
653        provider: &str,
654        tenant_id: Option<&str>,
655        status: &str,
656        age: StdDuration,
657    ) {
658        self.observe_histogram(
659            "harn_trigger_queue_age_at_dispatch_start_seconds",
660            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
661            age.as_secs_f64(),
662            &Self::TRIGGER_LATENCY_BUCKETS,
663        );
664    }
665
666    pub fn record_trigger_dispatch_runtime(
667        &self,
668        trigger_id: &str,
669        binding_key: &str,
670        provider: &str,
671        tenant_id: Option<&str>,
672        status: &str,
673        duration: StdDuration,
674    ) {
675        self.observe_histogram(
676            "harn_trigger_dispatch_runtime_seconds",
677            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
678            duration.as_secs_f64(),
679            &Self::TRIGGER_LATENCY_BUCKETS,
680        );
681    }
682
683    pub fn record_trigger_retry_delay(
684        &self,
685        trigger_id: &str,
686        binding_key: &str,
687        provider: &str,
688        tenant_id: Option<&str>,
689        status: &str,
690        duration: StdDuration,
691    ) {
692        self.observe_histogram(
693            "harn_trigger_retry_delay_seconds",
694            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
695            duration.as_secs_f64(),
696            &Self::TRIGGER_LATENCY_BUCKETS,
697        );
698    }
699
700    pub fn record_trigger_accepted_to_dlq(
701        &self,
702        trigger_id: &str,
703        binding_key: &str,
704        provider: &str,
705        tenant_id: Option<&str>,
706        status: &str,
707        duration: StdDuration,
708    ) {
709        self.observe_histogram(
710            "harn_trigger_accepted_to_dlq_seconds",
711            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
712            duration.as_secs_f64(),
713            &Self::TRIGGER_LATENCY_BUCKETS,
714        );
715    }
716
717    pub fn note_trigger_pending_event(
718        &self,
719        event_id: &str,
720        trigger_id: &str,
721        binding_key: &str,
722        provider: &str,
723        tenant_id: Option<&str>,
724        accepted_at_ms: i64,
725        now_ms: i64,
726    ) {
727        let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
728        {
729            let mut pending = self
730                .pending_trigger_events
731                .lock()
732                .expect("pending trigger events poisoned");
733            pending
734                .entry(labels.clone())
735                .or_default()
736                .insert(event_id.to_string(), accepted_at_ms);
737        }
738        self.refresh_oldest_pending_gauge(labels, now_ms);
739    }
740
741    pub fn clear_trigger_pending_event(
742        &self,
743        event_id: &str,
744        trigger_id: &str,
745        binding_key: &str,
746        provider: &str,
747        tenant_id: Option<&str>,
748        now_ms: i64,
749    ) {
750        let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
751        {
752            let mut pending = self
753                .pending_trigger_events
754                .lock()
755                .expect("pending trigger events poisoned");
756            if let Some(events) = pending.get_mut(&labels) {
757                events.remove(event_id);
758                if events.is_empty() {
759                    pending.remove(&labels);
760                }
761            }
762        }
763        self.refresh_oldest_pending_gauge(labels, now_ms);
764    }
765
766    pub fn set_trigger_inflight(&self, trigger_id: &str, count: u64) {
767        self.set_gauge(
768            "harn_trigger_inflight",
769            labels([("trigger_id", trigger_id)]),
770            count as f64,
771        );
772    }
773
774    pub fn set_trigger_budget_cost_today(&self, trigger_id: &str, cost_usd: f64) {
775        self.set_gauge(
776            "harn_trigger_budget_cost_today_usd",
777            labels([("trigger_id", trigger_id)]),
778            cost_usd.max(0.0),
779        );
780    }
781
782    pub fn record_trigger_budget_exhausted(&self, trigger_id: &str, strategy: &str) {
783        self.increment_counter(
784            "harn_trigger_budget_exhausted_total",
785            labels([("trigger_id", trigger_id), ("strategy", strategy)]),
786            1,
787        );
788    }
789
790    pub fn record_backpressure_event(&self, dimension: &str, action: &str) {
791        self.increment_counter(
792            "harn_backpressure_events_total",
793            labels([("dimension", dimension), ("action", action)]),
794            1,
795        );
796    }
797
798    pub fn record_event_log_append(
799        &self,
800        topic: &str,
801        duration: StdDuration,
802        payload_bytes: usize,
803    ) {
804        self.observe_histogram(
805            "harn_event_log_append_duration_seconds",
806            labels([("topic", topic)]),
807            duration.as_secs_f64(),
808            &Self::DURATION_BUCKETS,
809        );
810        self.set_gauge(
811            "harn_event_log_topic_size_bytes",
812            labels([("topic", topic)]),
813            payload_bytes as f64,
814        );
815    }
816
817    pub fn set_event_log_consumer_lag(&self, topic: &str, consumer: &str, lag: u64) {
818        self.set_gauge(
819            "harn_event_log_consumer_lag",
820            labels([("topic", topic), ("consumer", consumer)]),
821            lag as f64,
822        );
823    }
824
825    pub fn record_a2a_hop(&self, target: &str, outcome: &str, duration: StdDuration) {
826        self.increment_counter(
827            "harn_a2a_hops_total",
828            labels([("target", target), ("outcome", outcome)]),
829            1,
830        );
831        self.observe_histogram(
832            "harn_a2a_hop_duration_seconds",
833            labels([("target", target)]),
834            duration.as_secs_f64(),
835            &Self::DURATION_BUCKETS,
836        );
837    }
838
839    pub fn set_worker_queue_depth(&self, queue: &str, depth: u64) {
840        self.set_gauge(
841            "harn_worker_queue_depth",
842            labels([("queue", queue)]),
843            depth as f64,
844        );
845    }
846
847    pub fn record_worker_queue_claim_age(&self, queue: &str, age_seconds: f64) {
848        self.observe_histogram(
849            "harn_worker_queue_claim_age_seconds",
850            labels([("queue", queue)]),
851            age_seconds.max(0.0),
852            &Self::DURATION_BUCKETS,
853        );
854    }
855
856    /// Increment the scheduler-selection counter for a particular fairness key.
857    pub fn record_scheduler_selection(
858        &self,
859        queue: &str,
860        fairness_dimension: &str,
861        fairness_key: &str,
862    ) {
863        self.increment_counter(
864            "harn_scheduler_selections_total",
865            labels([
866                ("queue", queue),
867                ("fairness_dimension", fairness_dimension),
868                ("fairness_key", fairness_key),
869            ]),
870            1,
871        );
872    }
873
874    /// Increment the scheduler-deferred counter (queue had work but couldn't
875    /// be selected because the key was at its concurrency cap).
876    pub fn record_scheduler_deferral(
877        &self,
878        queue: &str,
879        fairness_dimension: &str,
880        fairness_key: &str,
881    ) {
882        self.increment_counter(
883            "harn_scheduler_deferrals_total",
884            labels([
885                ("queue", queue),
886                ("fairness_dimension", fairness_dimension),
887                ("fairness_key", fairness_key),
888            ]),
889            1,
890        );
891    }
892
893    /// Increment the scheduler starvation-promotion counter.
894    pub fn record_scheduler_starvation_promotion(
895        &self,
896        queue: &str,
897        fairness_dimension: &str,
898        fairness_key: &str,
899    ) {
900        self.increment_counter(
901            "harn_scheduler_starvation_promotions_total",
902            labels([
903                ("queue", queue),
904                ("fairness_dimension", fairness_dimension),
905                ("fairness_key", fairness_key),
906            ]),
907            1,
908        );
909    }
910
911    /// Set the current scheduler deficit gauge for a fairness key.
912    pub fn set_scheduler_deficit(
913        &self,
914        queue: &str,
915        fairness_dimension: &str,
916        fairness_key: &str,
917        deficit: i64,
918    ) {
919        self.set_gauge(
920            "harn_scheduler_deficit",
921            labels([
922                ("queue", queue),
923                ("fairness_dimension", fairness_dimension),
924                ("fairness_key", fairness_key),
925            ]),
926            deficit as f64,
927        );
928    }
929
930    /// Set the oldest-eligible-job-age gauge for a fairness key (seconds).
931    pub fn set_scheduler_oldest_eligible_age(
932        &self,
933        queue: &str,
934        fairness_dimension: &str,
935        fairness_key: &str,
936        age_ms: u64,
937    ) {
938        self.set_gauge(
939            "harn_scheduler_oldest_eligible_age_seconds",
940            labels([
941                ("queue", queue),
942                ("fairness_dimension", fairness_dimension),
943                ("fairness_key", fairness_key),
944            ]),
945            age_ms as f64 / 1000.0,
946        );
947    }
948
949    pub fn set_orchestrator_pump_backlog(&self, topic: &str, count: u64) {
950        self.set_gauge(
951            "harn_orchestrator_pump_backlog",
952            labels([("topic", topic)]),
953            count as f64,
954        );
955    }
956
957    pub fn set_orchestrator_pump_outstanding(&self, topic: &str, count: usize) {
958        self.set_gauge(
959            "harn_orchestrator_pump_outstanding",
960            labels([("topic", topic)]),
961            count as f64,
962        );
963    }
964
965    pub fn record_orchestrator_pump_admission_delay(&self, topic: &str, duration: StdDuration) {
966        self.observe_histogram(
967            "harn_orchestrator_pump_admission_delay_seconds",
968            labels([("topic", topic)]),
969            duration.as_secs_f64(),
970            &Self::DURATION_BUCKETS,
971        );
972    }
973
974    pub fn record_llm_call(&self, provider: &str, model: &str, outcome: &str, cost_usd: f64) {
975        self.increment_counter(
976            "harn_llm_calls_total",
977            labels([
978                ("provider", provider),
979                ("model", model),
980                ("outcome", outcome),
981            ]),
982            1,
983        );
984        if cost_usd > 0.0 {
985            self.increment_counter(
986                "harn_llm_cost_usd_total",
987                labels([("provider", provider), ("model", model)]),
988                cost_usd,
989            );
990        } else {
991            self.ensure_counter(
992                "harn_llm_cost_usd_total",
993                labels([("provider", provider), ("model", model)]),
994            );
995        }
996    }
997
998    pub fn record_llm_cache_hit(&self, provider: &str) {
999        self.increment_counter(
1000            "harn_llm_cache_hits_total",
1001            labels([("provider", provider)]),
1002            1,
1003        );
1004    }
1005
1006    /// Track LLM streaming responses aborted mid-stream by
1007    /// `schema_stream_abort` because the partial JSON cannot satisfy
1008    /// `output_schema`. Counter is labelled by `(provider, model)` so
1009    /// dashboards can attribute the savings — each abort short-circuits
1010    /// a provider stream that would otherwise have run to completion.
1011    pub fn record_schema_stream_aborted(&self, provider: &str, model: &str) {
1012        self.increment_counter(
1013            "harn_llm_schema_stream_aborted_total",
1014            labels([("provider", provider), ("model", model)]),
1015            1,
1016        );
1017    }
1018
1019    pub fn render_prometheus(&self) -> String {
1020        let snapshot = self.snapshot();
1021        let counters = [
1022            (
1023                "connector_linear_timestamp_rejections_total",
1024                snapshot.linear_timestamp_rejections_total,
1025            ),
1026            (
1027                "dispatch_succeeded_total",
1028                snapshot.dispatch_succeeded_total,
1029            ),
1030            ("dispatch_failed_total", snapshot.dispatch_failed_total),
1031            ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
1032            ("retry_scheduled_total", snapshot.retry_scheduled_total),
1033            (
1034                "slack_events_delivery_success_total",
1035                snapshot.slack_delivery_success_total,
1036            ),
1037            (
1038                "slack_events_delivery_failure_total",
1039                snapshot.slack_delivery_failure_total,
1040            ),
1041        ];
1042
1043        let mut rendered = String::new();
1044        for (name, value) in counters {
1045            rendered.push_str("# TYPE ");
1046            rendered.push_str(name);
1047            rendered.push_str(" counter\n");
1048            rendered.push_str(name);
1049            rendered.push(' ');
1050            rendered.push_str(&value.to_string());
1051            rendered.push('\n');
1052        }
1053        let custom_counters = self
1054            .custom_counters
1055            .lock()
1056            .expect("custom counters poisoned");
1057        for (name, value) in custom_counters.iter() {
1058            let metric_name = format!(
1059                "connector_custom_{}_total",
1060                name.chars()
1061                    .map(|ch| if ch.is_ascii_alphanumeric() || ch == '_' {
1062                        ch
1063                    } else {
1064                        '_'
1065                    })
1066                    .collect::<String>()
1067            );
1068            rendered.push_str("# TYPE ");
1069            rendered.push_str(&metric_name);
1070            rendered.push_str(" counter\n");
1071            rendered.push_str(&metric_name);
1072            rendered.push(' ');
1073            rendered.push_str(&value.to_string());
1074            rendered.push('\n');
1075        }
1076        rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
1077        rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
1078        rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
1079        rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
1080        self.render_generic_metrics(&mut rendered);
1081        rendered
1082    }
1083
1084    fn increment_counter(&self, name: &str, labels: MetricLabels, amount: impl Into<f64>) {
1085        let amount = amount.into();
1086        if amount <= 0.0 || !amount.is_finite() {
1087            return;
1088        }
1089        let mut counters = self.counters.lock().expect("metrics counters poisoned");
1090        *counters.entry((name.to_string(), labels)).or_default() += amount;
1091    }
1092
1093    fn ensure_counter(&self, name: &str, labels: MetricLabels) {
1094        let mut counters = self.counters.lock().expect("metrics counters poisoned");
1095        counters.entry((name.to_string(), labels)).or_default();
1096    }
1097
1098    fn set_gauge(&self, name: &str, labels: MetricLabels, value: f64) {
1099        let mut gauges = self.gauges.lock().expect("metrics gauges poisoned");
1100        gauges.insert((name.to_string(), labels), value);
1101    }
1102
1103    fn observe_histogram(
1104        &self,
1105        name: &str,
1106        labels: MetricLabels,
1107        value: f64,
1108        bucket_bounds: &[f64],
1109    ) {
1110        if !value.is_finite() {
1111            return;
1112        }
1113        let mut histograms = self.histograms.lock().expect("metrics histograms poisoned");
1114        let histogram = histograms
1115            .entry((name.to_string(), labels))
1116            .or_insert_with(|| HistogramMetric {
1117                buckets: bucket_bounds
1118                    .iter()
1119                    .map(|bound| (prometheus_float(*bound), 0))
1120                    .chain(std::iter::once(("+Inf".to_string(), 0)))
1121                    .collect(),
1122                count: 0,
1123                sum: 0.0,
1124            });
1125        histogram.count += 1;
1126        histogram.sum += value;
1127        for bound in bucket_bounds {
1128            if value <= *bound {
1129                let key = prometheus_float(*bound);
1130                *histogram.buckets.entry(key).or_default() += 1;
1131            }
1132        }
1133        *histogram.buckets.entry("+Inf".to_string()).or_default() += 1;
1134    }
1135
1136    fn refresh_oldest_pending_gauge(&self, labels: MetricLabels, now_ms: i64) {
1137        let oldest_accepted_at_ms = self
1138            .pending_trigger_events
1139            .lock()
1140            .expect("pending trigger events poisoned")
1141            .get(&labels)
1142            .and_then(|events| events.values().min().copied());
1143        let age_seconds = oldest_accepted_at_ms
1144            .map(|accepted_at_ms| millis_delta(now_ms, accepted_at_ms).as_secs_f64())
1145            .unwrap_or(0.0);
1146        self.set_gauge(
1147            "harn_trigger_oldest_pending_age_seconds",
1148            labels,
1149            age_seconds,
1150        );
1151    }
1152
1153    fn render_generic_metrics(&self, rendered: &mut String) {
1154        let counters = self
1155            .counters
1156            .lock()
1157            .expect("metrics counters poisoned")
1158            .clone();
1159        let gauges = self.gauges.lock().expect("metrics gauges poisoned").clone();
1160        let histograms = self
1161            .histograms
1162            .lock()
1163            .expect("metrics histograms poisoned")
1164            .clone();
1165
1166        for name in metric_family_names(MetricKind::Counter) {
1167            rendered.push_str("# TYPE ");
1168            rendered.push_str(name);
1169            rendered.push_str(" counter\n");
1170            for ((sample_name, labels), value) in counters.iter().filter(|((n, _), _)| n == name) {
1171                render_sample(rendered, sample_name, labels, *value);
1172            }
1173        }
1174        for name in metric_family_names(MetricKind::Gauge) {
1175            rendered.push_str("# TYPE ");
1176            rendered.push_str(name);
1177            rendered.push_str(" gauge\n");
1178            for ((sample_name, labels), value) in gauges.iter().filter(|((n, _), _)| n == name) {
1179                render_sample(rendered, sample_name, labels, *value);
1180            }
1181        }
1182        for name in metric_family_names(MetricKind::Histogram) {
1183            rendered.push_str("# TYPE ");
1184            rendered.push_str(name);
1185            rendered.push_str(" histogram\n");
1186            for ((sample_name, labels), histogram) in
1187                histograms.iter().filter(|((n, _), _)| n == name)
1188            {
1189                for (le, value) in &histogram.buckets {
1190                    let mut bucket_labels = labels.clone();
1191                    bucket_labels.insert("le".to_string(), le.clone());
1192                    render_sample(
1193                        rendered,
1194                        &format!("{sample_name}_bucket"),
1195                        &bucket_labels,
1196                        *value as f64,
1197                    );
1198                }
1199                render_sample(
1200                    rendered,
1201                    &format!("{sample_name}_sum"),
1202                    labels,
1203                    histogram.sum,
1204                );
1205                render_sample(
1206                    rendered,
1207                    &format!("{sample_name}_count"),
1208                    labels,
1209                    histogram.count as f64,
1210                );
1211            }
1212        }
1213    }
1214}
1215
1216#[derive(Clone, Copy)]
1217enum MetricKind {
1218    Counter,
1219    Gauge,
1220    Histogram,
1221}
1222
1223fn metric_family_names(kind: MetricKind) -> &'static [&'static str] {
1224    match kind {
1225        MetricKind::Counter => &[
1226            "harn_http_requests_total",
1227            "harn_trigger_received_total",
1228            "harn_trigger_deduped_total",
1229            "harn_trigger_predicate_evaluations_total",
1230            "harn_trigger_dispatched_total",
1231            "harn_trigger_retries_total",
1232            "harn_trigger_dlq_total",
1233            "harn_trigger_budget_exhausted_total",
1234            "harn_backpressure_events_total",
1235            "harn_a2a_hops_total",
1236            "harn_llm_calls_total",
1237            "harn_llm_cost_usd_total",
1238            "harn_llm_cache_hits_total",
1239            "harn_llm_schema_stream_aborted_total",
1240            "harn_scheduler_selections_total",
1241            "harn_scheduler_deferrals_total",
1242            "harn_scheduler_starvation_promotions_total",
1243        ],
1244        MetricKind::Gauge => &[
1245            "harn_trigger_inflight",
1246            "harn_event_log_topic_size_bytes",
1247            "harn_event_log_consumer_lag",
1248            "harn_trigger_budget_cost_today_usd",
1249            "harn_worker_queue_depth",
1250            "harn_orchestrator_pump_backlog",
1251            "harn_orchestrator_pump_outstanding",
1252            "harn_trigger_oldest_pending_age_seconds",
1253            "harn_scheduler_deficit",
1254            "harn_scheduler_oldest_eligible_age_seconds",
1255        ],
1256        MetricKind::Histogram => &[
1257            "harn_http_request_duration_seconds",
1258            "harn_http_body_size_bytes",
1259            "harn_trigger_predicate_cost_usd",
1260            "harn_event_log_append_duration_seconds",
1261            "harn_a2a_hop_duration_seconds",
1262            "harn_worker_queue_claim_age_seconds",
1263            "harn_orchestrator_pump_admission_delay_seconds",
1264            "harn_trigger_webhook_accepted_to_normalized_seconds",
1265            "harn_trigger_webhook_accepted_to_queue_append_seconds",
1266            "harn_trigger_queue_age_at_dispatch_admission_seconds",
1267            "harn_trigger_queue_age_at_dispatch_start_seconds",
1268            "harn_trigger_dispatch_runtime_seconds",
1269            "harn_trigger_retry_delay_seconds",
1270            "harn_trigger_accepted_to_dlq_seconds",
1271        ],
1272    }
1273}
1274
1275fn labels<const N: usize>(pairs: [(&str, &str); N]) -> MetricLabels {
1276    pairs
1277        .into_iter()
1278        .map(|(name, value)| (name.to_string(), value.to_string()))
1279        .collect()
1280}
1281
1282fn trigger_lifecycle_labels(
1283    trigger_id: &str,
1284    binding_key: &str,
1285    provider: &str,
1286    tenant_id: Option<&str>,
1287    status: &str,
1288) -> MetricLabels {
1289    labels([
1290        ("binding_key", binding_key),
1291        ("provider", provider),
1292        ("status", status),
1293        ("tenant_id", tenant_label(tenant_id)),
1294        ("trigger_id", trigger_id),
1295    ])
1296}
1297
1298fn trigger_pending_labels(
1299    trigger_id: &str,
1300    binding_key: &str,
1301    provider: &str,
1302    tenant_id: Option<&str>,
1303) -> MetricLabels {
1304    labels([
1305        ("binding_key", binding_key),
1306        ("provider", provider),
1307        ("tenant_id", tenant_label(tenant_id)),
1308        ("trigger_id", trigger_id),
1309    ])
1310}
1311
1312fn tenant_label(tenant_id: Option<&str>) -> &str {
1313    tenant_id
1314        .map(str::trim)
1315        .filter(|value| !value.is_empty())
1316        .unwrap_or("none")
1317}
1318
1319fn millis_delta(later_ms: i64, earlier_ms: i64) -> StdDuration {
1320    StdDuration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
1321}
1322
1323fn render_sample(rendered: &mut String, name: &str, labels: &MetricLabels, value: f64) {
1324    rendered.push_str(name);
1325    if !labels.is_empty() {
1326        rendered.push('{');
1327        for (index, (label, label_value)) in labels.iter().enumerate() {
1328            if index > 0 {
1329                rendered.push(',');
1330            }
1331            rendered.push_str(label);
1332            rendered.push_str("=\"");
1333            rendered.push_str(&escape_label_value(label_value));
1334            rendered.push('"');
1335        }
1336        rendered.push('}');
1337    }
1338    rendered.push(' ');
1339    rendered.push_str(&prometheus_float(value));
1340    rendered.push('\n');
1341}
1342
1343fn escape_label_value(value: &str) -> String {
1344    value
1345        .chars()
1346        .flat_map(|ch| match ch {
1347            '\\' => "\\\\".chars().collect::<Vec<_>>(),
1348            '"' => "\\\"".chars().collect::<Vec<_>>(),
1349            '\n' => "\\n".chars().collect::<Vec<_>>(),
1350            other => vec![other],
1351        })
1352        .collect()
1353}
1354
1355fn prometheus_float(value: f64) -> String {
1356    if value.is_infinite() && value.is_sign_positive() {
1357        return "+Inf".to_string();
1358    }
1359    if value.fract() == 0.0 {
1360        format!("{value:.0}")
1361    } else {
1362        let rendered = format!("{value:.6}");
1363        rendered
1364            .trim_end_matches('0')
1365            .trim_end_matches('.')
1366            .to_string()
1367    }
1368}
1369
1370/// Provider payload schema metadata exposed by a connector.
1371#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1372pub struct ProviderPayloadSchema {
1373    pub harn_schema_name: String,
1374    #[serde(default)]
1375    pub json_schema: JsonValue,
1376}
1377
1378impl ProviderPayloadSchema {
1379    pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
1380        Self {
1381            harn_schema_name: harn_schema_name.into(),
1382            json_schema,
1383        }
1384    }
1385
1386    pub fn named(harn_schema_name: impl Into<String>) -> Self {
1387        Self::new(harn_schema_name, JsonValue::Null)
1388    }
1389}
1390
1391impl Default for ProviderPayloadSchema {
1392    fn default() -> Self {
1393        Self::named("raw")
1394    }
1395}
1396
1397/// High-level transport kind a connector supports.
1398#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1399#[serde(transparent)]
1400pub struct TriggerKind(String);
1401
1402impl TriggerKind {
1403    pub fn new(value: impl Into<String>) -> Self {
1404        Self(value.into())
1405    }
1406
1407    pub fn as_str(&self) -> &str {
1408        self.0.as_str()
1409    }
1410}
1411
1412impl From<&str> for TriggerKind {
1413    fn from(value: &str) -> Self {
1414        Self::new(value)
1415    }
1416}
1417
1418impl From<String> for TriggerKind {
1419    fn from(value: String) -> Self {
1420        Self::new(value)
1421    }
1422}
1423
1424/// Future trigger manifest binding routed to a connector activation.
1425#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1426pub struct TriggerBinding {
1427    pub provider: ProviderId,
1428    pub kind: TriggerKind,
1429    pub binding_id: String,
1430    #[serde(default)]
1431    pub dedupe_key: Option<String>,
1432    #[serde(default = "default_dedupe_retention_days")]
1433    pub dedupe_retention_days: u32,
1434    #[serde(default)]
1435    pub config: JsonValue,
1436}
1437
1438impl TriggerBinding {
1439    pub fn new(
1440        provider: ProviderId,
1441        kind: impl Into<TriggerKind>,
1442        binding_id: impl Into<String>,
1443    ) -> Self {
1444        Self {
1445            provider,
1446            kind: kind.into(),
1447            binding_id: binding_id.into(),
1448            dedupe_key: None,
1449            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1450            config: JsonValue::Null,
1451        }
1452    }
1453}
1454
1455fn default_dedupe_retention_days() -> u32 {
1456    crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
1457}
1458
1459/// Small in-memory trigger-binding registry used to fan bindings into connectors.
1460#[derive(Clone, Debug, Default)]
1461pub struct TriggerRegistry {
1462    bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
1463}
1464
1465impl TriggerRegistry {
1466    pub fn register(&mut self, binding: TriggerBinding) {
1467        self.bindings
1468            .entry(binding.provider.clone())
1469            .or_default()
1470            .push(binding);
1471    }
1472
1473    pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
1474        &self.bindings
1475    }
1476
1477    pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
1478        self.bindings
1479            .get(provider)
1480            .map(Vec::as_slice)
1481            .unwrap_or(&[])
1482    }
1483}
1484
1485/// Metadata returned from connector activation.
1486#[derive(Clone, Debug, PartialEq, Eq)]
1487pub struct ActivationHandle {
1488    pub provider: ProviderId,
1489    pub binding_count: usize,
1490}
1491
1492impl ActivationHandle {
1493    pub fn new(provider: ProviderId, binding_count: usize) -> Self {
1494        Self {
1495            provider,
1496            binding_count,
1497        }
1498    }
1499}
1500
1501/// Provider-native inbound request payload preserved as raw bytes.
1502#[derive(Clone, Debug, PartialEq, Eq)]
1503pub struct RawInbound {
1504    pub kind: String,
1505    pub headers: BTreeMap<String, String>,
1506    pub query: BTreeMap<String, String>,
1507    pub body: Vec<u8>,
1508    pub received_at: OffsetDateTime,
1509    pub occurred_at: Option<OffsetDateTime>,
1510    pub tenant_id: Option<TenantId>,
1511    pub metadata: JsonValue,
1512}
1513
1514impl RawInbound {
1515    pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
1516        Self {
1517            kind: kind.into(),
1518            headers,
1519            query: BTreeMap::new(),
1520            body,
1521            received_at: clock::now_utc(),
1522            occurred_at: None,
1523            tenant_id: None,
1524            metadata: JsonValue::Null,
1525        }
1526    }
1527
1528    pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
1529        Ok(serde_json::from_slice(&self.body)?)
1530    }
1531}
1532
1533/// Token-bucket configuration shared across connector clients.
1534#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1535pub struct RateLimitConfig {
1536    pub capacity: u32,
1537    pub refill_tokens: u32,
1538    pub refill_interval: StdDuration,
1539}
1540
1541impl Default for RateLimitConfig {
1542    fn default() -> Self {
1543        Self {
1544            capacity: 60,
1545            refill_tokens: 1,
1546            refill_interval: StdDuration::from_secs(1),
1547        }
1548    }
1549}
1550
1551#[derive(Clone, Debug)]
1552struct TokenBucket {
1553    tokens: f64,
1554    last_refill: ClockInstant,
1555}
1556
1557impl TokenBucket {
1558    fn full(config: RateLimitConfig) -> Self {
1559        Self {
1560            tokens: config.capacity as f64,
1561            last_refill: clock::instant_now(),
1562        }
1563    }
1564
1565    fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
1566        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1567        let rate = config.refill_tokens.max(1) as f64 / interval;
1568        let elapsed = now.duration_since(self.last_refill).as_secs_f64();
1569        self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
1570        self.last_refill = now;
1571    }
1572
1573    fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
1574        self.refill(config, now);
1575        if self.tokens >= 1.0 {
1576            self.tokens -= 1.0;
1577            true
1578        } else {
1579            false
1580        }
1581    }
1582
1583    fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
1584        if self.tokens >= 1.0 {
1585            return StdDuration::ZERO;
1586        }
1587        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1588        let rate = config.refill_tokens.max(1) as f64 / interval;
1589        let missing = (1.0 - self.tokens).max(0.0);
1590        StdDuration::from_secs_f64((missing / rate).max(0.001))
1591    }
1592}
1593
1594/// Shared per-provider, per-key token bucket factory for outbound connector clients.
1595#[derive(Debug)]
1596pub struct RateLimiterFactory {
1597    config: RateLimitConfig,
1598    buckets: Mutex<HashMap<(String, String), TokenBucket>>,
1599}
1600
1601impl RateLimiterFactory {
1602    pub fn new(config: RateLimitConfig) -> Self {
1603        Self {
1604            config,
1605            buckets: Mutex::new(HashMap::new()),
1606        }
1607    }
1608
1609    pub fn config(&self) -> RateLimitConfig {
1610        self.config
1611    }
1612
1613    pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
1614        ScopedRateLimiter {
1615            factory: self,
1616            provider: provider.clone(),
1617            key: key.into(),
1618        }
1619    }
1620
1621    pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
1622        let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1623        let bucket = buckets
1624            .entry((provider.as_str().to_string(), key.to_string()))
1625            .or_insert_with(|| TokenBucket::full(self.config));
1626        bucket.try_acquire(self.config, clock::instant_now())
1627    }
1628
1629    pub async fn acquire(&self, provider: &ProviderId, key: &str) {
1630        loop {
1631            let wait = {
1632                let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1633                let bucket = buckets
1634                    .entry((provider.as_str().to_string(), key.to_string()))
1635                    .or_insert_with(|| TokenBucket::full(self.config));
1636                if bucket.try_acquire(self.config, clock::instant_now()) {
1637                    return;
1638                }
1639                bucket.wait_duration(self.config)
1640            };
1641            // Honor the unified mock clock so tests that pin time via
1642            // `mock_time(...)` don't deadlock here: the bucket reads
1643            // `instant_now()` (mocked), and this sleep advances the same
1644            // mock instead of waiting on a wall-clock that never moves.
1645            clock::sleep(wait).await;
1646        }
1647    }
1648}
1649
1650impl Default for RateLimiterFactory {
1651    fn default() -> Self {
1652        Self::new(RateLimitConfig::default())
1653    }
1654}
1655
1656/// Borrowed view onto a single provider/key rate-limit scope.
1657#[derive(Clone, Debug)]
1658pub struct ScopedRateLimiter<'a> {
1659    factory: &'a RateLimiterFactory,
1660    provider: ProviderId,
1661    key: String,
1662}
1663
1664impl<'a> ScopedRateLimiter<'a> {
1665    pub fn try_acquire(&self) -> bool {
1666        self.factory.try_acquire(&self.provider, &self.key)
1667    }
1668
1669    pub async fn acquire(&self) {
1670        self.factory.acquire(&self.provider, &self.key).await;
1671    }
1672}
1673
1674/// Runtime connector registry keyed by provider id.
1675pub struct ConnectorRegistry {
1676    connectors: BTreeMap<ProviderId, ConnectorHandle>,
1677}
1678
1679impl ConnectorRegistry {
1680    pub fn empty() -> Self {
1681        Self {
1682            connectors: BTreeMap::new(),
1683        }
1684    }
1685
1686    pub fn with_defaults() -> Self {
1687        let mut registry = Self::empty();
1688        for provider in registered_provider_metadata() {
1689            if !matches!(provider.runtime, ProviderRuntimeMetadata::Builtin { .. }) {
1690                continue;
1691            }
1692            registry
1693                .register(default_connector_for_provider(&provider))
1694                .expect("default connector registration should not fail");
1695        }
1696        registry
1697    }
1698
1699    pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
1700        let provider = connector.provider_id().clone();
1701        if self.connectors.contains_key(&provider) {
1702            return Err(ConnectorError::DuplicateProvider(provider.0));
1703        }
1704        self.connectors
1705            .insert(provider, Arc::new(AsyncMutex::new(connector)));
1706        Ok(())
1707    }
1708
1709    pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
1710        self.connectors.get(id).cloned()
1711    }
1712
1713    pub fn remove(&mut self, id: &ProviderId) -> Option<ConnectorHandle> {
1714        self.connectors.remove(id)
1715    }
1716
1717    pub fn list(&self) -> Vec<ProviderId> {
1718        self.connectors.keys().cloned().collect()
1719    }
1720
1721    pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1722        for connector in self.connectors.values() {
1723            connector.lock().await.init(ctx.clone()).await?;
1724        }
1725        Ok(())
1726    }
1727
1728    pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
1729        let mut clients = BTreeMap::new();
1730        for (provider, connector) in &self.connectors {
1731            let client = connector.lock().await.client();
1732            clients.insert(provider.clone(), client);
1733        }
1734        clients
1735    }
1736
1737    pub async fn activate_all(
1738        &self,
1739        registry: &TriggerRegistry,
1740    ) -> Result<Vec<ActivationHandle>, ConnectorError> {
1741        let mut handles = Vec::new();
1742        for (provider, connector) in &self.connectors {
1743            let bindings = registry.bindings_for(provider);
1744            if bindings.is_empty() {
1745                continue;
1746            }
1747            let connector = connector.lock().await;
1748            handles.push(connector.activate(bindings).await?);
1749        }
1750        Ok(handles)
1751    }
1752}
1753
1754impl Default for ConnectorRegistry {
1755    fn default() -> Self {
1756        Self::with_defaults()
1757    }
1758}
1759
1760fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
1761    match &provider.runtime {
1762        ProviderRuntimeMetadata::Builtin {
1763            connector,
1764            default_signature_variant,
1765        } => match connector.as_str() {
1766            "a2a-push" => Box::new(A2aPushConnector::new()),
1767            "cron" => Box::new(CronConnector::new()),
1768            "stream" => Box::new(StreamConnector::new(
1769                ProviderId::from(provider.provider.clone()),
1770                provider.schema_name.clone(),
1771            )),
1772            "webhook" => {
1773                let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
1774                    .expect("catalog webhook signature variant must be valid");
1775                Box::new(GenericWebhookConnector::with_profile(
1776                    WebhookProviderProfile::new(
1777                        ProviderId::from(provider.provider.clone()),
1778                        provider.schema_name.clone(),
1779                        variant,
1780                    ),
1781                ))
1782            }
1783            _ => Box::new(PlaceholderConnector::from_metadata(provider)),
1784        },
1785        ProviderRuntimeMetadata::Placeholder => {
1786            Box::new(PlaceholderConnector::from_metadata(provider))
1787        }
1788    }
1789}
1790
1791struct PlaceholderConnector {
1792    provider_id: ProviderId,
1793    kinds: Vec<TriggerKind>,
1794    schema_name: String,
1795}
1796
1797impl PlaceholderConnector {
1798    fn from_metadata(metadata: &ProviderMetadata) -> Self {
1799        Self {
1800            provider_id: ProviderId::from(metadata.provider.clone()),
1801            kinds: metadata
1802                .kinds
1803                .iter()
1804                .cloned()
1805                .map(TriggerKind::from)
1806                .collect(),
1807            schema_name: metadata.schema_name.clone(),
1808        }
1809    }
1810}
1811
1812struct PlaceholderClient;
1813
1814#[async_trait]
1815impl ConnectorClient for PlaceholderClient {
1816    async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1817        Err(ClientError::Other(format!(
1818            "connector client method '{method}' is not implemented for this provider"
1819        )))
1820    }
1821}
1822
1823#[async_trait]
1824impl Connector for PlaceholderConnector {
1825    fn provider_id(&self) -> &ProviderId {
1826        &self.provider_id
1827    }
1828
1829    fn kinds(&self) -> &[TriggerKind] {
1830        &self.kinds
1831    }
1832
1833    async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1834        Ok(())
1835    }
1836
1837    async fn activate(
1838        &self,
1839        bindings: &[TriggerBinding],
1840    ) -> Result<ActivationHandle, ConnectorError> {
1841        Ok(ActivationHandle::new(
1842            self.provider_id.clone(),
1843            bindings.len(),
1844        ))
1845    }
1846
1847    async fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
1848        Err(ConnectorError::Unsupported(format!(
1849            "provider '{}' is cataloged but does not have a concrete inbound connector yet",
1850            self.provider_id.as_str()
1851        )))
1852    }
1853
1854    fn payload_schema(&self) -> ProviderPayloadSchema {
1855        ProviderPayloadSchema::named(self.schema_name.clone())
1856    }
1857
1858    fn client(&self) -> Arc<dyn ConnectorClient> {
1859        Arc::new(PlaceholderClient)
1860    }
1861}
1862
1863pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
1864    ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
1865        *slot.borrow_mut() = clients
1866            .into_iter()
1867            .map(|(provider, client)| (provider.as_str().to_string(), client))
1868            .collect();
1869    });
1870}
1871
1872pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
1873    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
1874}
1875
1876pub fn clear_active_connector_clients() {
1877    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
1878}
1879
1880#[cfg(test)]
1881mod tests {
1882    use super::*;
1883
1884    use std::sync::atomic::{AtomicUsize, Ordering};
1885
1886    use async_trait::async_trait;
1887    use serde_json::json;
1888
1889    struct NoopClient;
1890
1891    #[async_trait]
1892    impl ConnectorClient for NoopClient {
1893        async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1894            Ok(json!({ "method": method }))
1895        }
1896    }
1897
1898    struct FakeConnector {
1899        provider_id: ProviderId,
1900        kinds: Vec<TriggerKind>,
1901        activate_calls: Arc<AtomicUsize>,
1902    }
1903
1904    impl FakeConnector {
1905        fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
1906            Self {
1907                provider_id: ProviderId::from(provider_id),
1908                kinds: vec![TriggerKind::from("webhook")],
1909                activate_calls,
1910            }
1911        }
1912    }
1913
1914    #[async_trait]
1915    impl Connector for FakeConnector {
1916        fn provider_id(&self) -> &ProviderId {
1917            &self.provider_id
1918        }
1919
1920        fn kinds(&self) -> &[TriggerKind] {
1921            &self.kinds
1922        }
1923
1924        async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1925            Ok(())
1926        }
1927
1928        async fn activate(
1929            &self,
1930            bindings: &[TriggerBinding],
1931        ) -> Result<ActivationHandle, ConnectorError> {
1932            self.activate_calls.fetch_add(1, Ordering::SeqCst);
1933            Ok(ActivationHandle::new(
1934                self.provider_id.clone(),
1935                bindings.len(),
1936            ))
1937        }
1938
1939        async fn normalize_inbound(
1940            &self,
1941            _raw: RawInbound,
1942        ) -> Result<TriggerEvent, ConnectorError> {
1943            Err(ConnectorError::Unsupported(
1944                "not needed for registry tests".to_string(),
1945            ))
1946        }
1947
1948        fn payload_schema(&self) -> ProviderPayloadSchema {
1949            ProviderPayloadSchema::named("FakePayload")
1950        }
1951
1952        fn client(&self) -> Arc<dyn ConnectorClient> {
1953            Arc::new(NoopClient)
1954        }
1955    }
1956
1957    #[tokio::test]
1958    async fn connector_registry_rejects_duplicate_providers() {
1959        let activate_calls = Arc::new(AtomicUsize::new(0));
1960        let mut registry = ConnectorRegistry::empty();
1961        registry
1962            .register(Box::new(FakeConnector::new(
1963                "github",
1964                activate_calls.clone(),
1965            )))
1966            .unwrap();
1967
1968        let error = registry
1969            .register(Box::new(FakeConnector::new("github", activate_calls)))
1970            .unwrap_err();
1971        assert!(matches!(
1972            error,
1973            ConnectorError::DuplicateProvider(provider) if provider == "github"
1974        ));
1975    }
1976
1977    #[tokio::test]
1978    async fn connector_registry_activates_only_bound_connectors() {
1979        let github_calls = Arc::new(AtomicUsize::new(0));
1980        let slack_calls = Arc::new(AtomicUsize::new(0));
1981        let mut registry = ConnectorRegistry::empty();
1982        registry
1983            .register(Box::new(FakeConnector::new("github", github_calls.clone())))
1984            .unwrap();
1985        registry
1986            .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
1987            .unwrap();
1988
1989        let mut trigger_registry = TriggerRegistry::default();
1990        trigger_registry.register(TriggerBinding::new(
1991            ProviderId::from("github"),
1992            "webhook",
1993            "github.push",
1994        ));
1995        trigger_registry.register(TriggerBinding::new(
1996            ProviderId::from("github"),
1997            "webhook",
1998            "github.installation",
1999        ));
2000
2001        let handles = registry.activate_all(&trigger_registry).await.unwrap();
2002        assert_eq!(handles.len(), 1);
2003        assert_eq!(handles[0].provider.as_str(), "github");
2004        assert_eq!(handles[0].binding_count, 2);
2005        assert_eq!(github_calls.load(Ordering::SeqCst), 1);
2006        assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
2007    }
2008
2009    #[test]
2010    fn rate_limiter_scopes_tokens_by_provider_and_key() {
2011        let factory = RateLimiterFactory::new(RateLimitConfig {
2012            capacity: 1,
2013            refill_tokens: 1,
2014            refill_interval: StdDuration::from_mins(1),
2015        });
2016
2017        assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
2018        assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
2019        assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
2020        assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
2021    }
2022
2023    #[test]
2024    fn raw_inbound_json_body_preserves_raw_bytes() {
2025        let raw = RawInbound::new(
2026            "push",
2027            BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
2028            br#"{"ok":true}"#.to_vec(),
2029        );
2030
2031        assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
2032    }
2033
2034    #[test]
2035    fn connector_registry_lists_core_catalog_providers() {
2036        let registry = ConnectorRegistry::default();
2037        let providers = registry.list();
2038        assert!(providers.contains(&ProviderId::from("cron")));
2039        assert!(providers.contains(&ProviderId::from("webhook")));
2040        assert!(providers.contains(&ProviderId::from("kafka")));
2041        assert!(!providers.contains(&ProviderId::from("github")));
2042        assert!(!providers.contains(&ProviderId::from("slack")));
2043    }
2044
2045    #[test]
2046    fn pure_harn_pivot_only_keeps_core_builtin_connectors() {
2047        let core_runtime_providers = [
2048            "a2a-push",
2049            "cron",
2050            "email",
2051            "kafka",
2052            "nats",
2053            "postgres-cdc",
2054            "pulsar",
2055            "webhook",
2056            "websocket",
2057        ];
2058
2059        for provider in registered_provider_metadata() {
2060            if !matches!(provider.runtime, ProviderRuntimeMetadata::Builtin { .. }) {
2061                continue;
2062            }
2063
2064            let allowed_core = core_runtime_providers.contains(&provider.provider.as_str());
2065            assert!(
2066                allowed_core,
2067                "provider '{}' is registered as a Rust builtin connector; new service connectors \
2068                 must ship as pure-Harn packages and register with connector = {{ harn = \"...\" }}",
2069                provider.provider
2070            );
2071        }
2072    }
2073
2074    #[test]
2075    fn metrics_registry_exports_orchestrator_metric_families() {
2076        let metrics = MetricsRegistry::default();
2077        metrics.record_http_request(
2078            "/triggers/github",
2079            "POST",
2080            200,
2081            StdDuration::from_millis(25),
2082            512,
2083        );
2084        metrics.record_trigger_received("github-new-issue", "github");
2085        metrics.record_trigger_deduped("github-new-issue", "inbox_duplicate");
2086        metrics.record_trigger_predicate_evaluation("github-new-issue", true, 0.002);
2087        metrics.record_trigger_dispatched("github-new-issue", "local", "succeeded");
2088        metrics.record_trigger_retry("github-new-issue", 2);
2089        metrics.record_trigger_dlq("github-new-issue", "retry_exhausted");
2090        metrics.set_trigger_inflight("github-new-issue", 0);
2091        metrics.record_event_log_append(
2092            "orchestrator.triggers.pending",
2093            StdDuration::from_millis(1),
2094            2048,
2095        );
2096        metrics.set_event_log_consumer_lag("orchestrator.triggers.pending", "orchestrator-pump", 0);
2097        metrics.set_trigger_budget_cost_today("github-new-issue", 0.002);
2098        metrics.record_trigger_budget_exhausted("github-new-issue", "daily_budget_exceeded");
2099        metrics.record_a2a_hop("agent.example", "succeeded", StdDuration::from_millis(10));
2100        metrics.set_worker_queue_depth("triage", 1);
2101        metrics.record_worker_queue_claim_age("triage", 3.0);
2102        metrics.set_orchestrator_pump_backlog("trigger.inbox.envelopes", 2);
2103        metrics.set_orchestrator_pump_outstanding("trigger.inbox.envelopes", 1);
2104        metrics.record_orchestrator_pump_admission_delay(
2105            "trigger.inbox.envelopes",
2106            StdDuration::from_millis(50),
2107        );
2108        metrics.record_trigger_accepted_to_normalized(
2109            "github-new-issue",
2110            "github-new-issue@v7",
2111            "github",
2112            Some("tenant-a"),
2113            "normalized",
2114            StdDuration::from_millis(25),
2115        );
2116        metrics.record_trigger_accepted_to_queue_append(
2117            "github-new-issue",
2118            "github-new-issue@v7",
2119            "github",
2120            Some("tenant-a"),
2121            "queued",
2122            StdDuration::from_millis(40),
2123        );
2124        metrics.record_trigger_queue_age_at_dispatch_admission(
2125            "github-new-issue",
2126            "github-new-issue@v7",
2127            "github",
2128            Some("tenant-a"),
2129            "admitted",
2130            StdDuration::from_millis(75),
2131        );
2132        metrics.record_trigger_queue_age_at_dispatch_start(
2133            "github-new-issue",
2134            "github-new-issue@v7",
2135            "github",
2136            Some("tenant-a"),
2137            "started",
2138            StdDuration::from_millis(125),
2139        );
2140        metrics.record_trigger_dispatch_runtime(
2141            "github-new-issue",
2142            "github-new-issue@v7",
2143            "github",
2144            Some("tenant-a"),
2145            "succeeded",
2146            StdDuration::from_millis(250),
2147        );
2148        metrics.record_trigger_retry_delay(
2149            "github-new-issue",
2150            "github-new-issue@v7",
2151            "github",
2152            Some("tenant-a"),
2153            "scheduled",
2154            StdDuration::from_secs(2),
2155        );
2156        metrics.record_trigger_accepted_to_dlq(
2157            "github-new-issue",
2158            "github-new-issue@v7",
2159            "github",
2160            Some("tenant-a"),
2161            "retry_exhausted",
2162            StdDuration::from_secs(45),
2163        );
2164        metrics.record_backpressure_event("ingest", "reject");
2165        metrics.note_trigger_pending_event(
2166            "evt-1",
2167            "github-new-issue",
2168            "github-new-issue@v7",
2169            "github",
2170            Some("tenant-a"),
2171            1_000,
2172            4_000,
2173        );
2174        metrics.record_llm_call("mock", "mock", "succeeded", 0.01);
2175        metrics.record_llm_cache_hit("mock");
2176
2177        let rendered = metrics.render_prometheus();
2178        for needle in [
2179            "harn_http_requests_total{endpoint=\"/triggers/github\",method=\"POST\",status=\"200\"} 1",
2180            "harn_http_request_duration_seconds_bucket{endpoint=\"/triggers/github\",le=\"0.05\"} 1",
2181            "harn_http_body_size_bytes_bucket{endpoint=\"/triggers/github\",le=\"512\"} 1",
2182            "harn_trigger_received_total{provider=\"github\",trigger_id=\"github-new-issue\"} 1",
2183            "harn_trigger_deduped_total{reason=\"inbox_duplicate\",trigger_id=\"github-new-issue\"} 1",
2184            "harn_trigger_predicate_evaluations_total{result=\"true\",trigger_id=\"github-new-issue\"} 1",
2185            "harn_trigger_predicate_cost_usd_bucket{le=\"0.01\",trigger_id=\"github-new-issue\"} 1",
2186            "harn_trigger_dispatched_total{handler_kind=\"local\",outcome=\"succeeded\",trigger_id=\"github-new-issue\"} 1",
2187            "harn_trigger_retries_total{attempt=\"2\",trigger_id=\"github-new-issue\"} 1",
2188            "harn_trigger_dlq_total{reason=\"retry_exhausted\",trigger_id=\"github-new-issue\"} 1",
2189            "harn_trigger_inflight{trigger_id=\"github-new-issue\"} 0",
2190            "harn_event_log_append_duration_seconds_bucket{le=\"0.005\",topic=\"orchestrator.triggers.pending\"} 1",
2191            "harn_event_log_topic_size_bytes{topic=\"orchestrator.triggers.pending\"} 2048",
2192            "harn_event_log_consumer_lag{consumer=\"orchestrator-pump\",topic=\"orchestrator.triggers.pending\"} 0",
2193            "harn_trigger_budget_cost_today_usd{trigger_id=\"github-new-issue\"} 0.002",
2194            "harn_trigger_budget_exhausted_total{strategy=\"daily_budget_exceeded\",trigger_id=\"github-new-issue\"} 1",
2195            "harn_backpressure_events_total{action=\"reject\",dimension=\"ingest\"} 1",
2196            "harn_a2a_hops_total{outcome=\"succeeded\",target=\"agent.example\"} 1",
2197            "harn_a2a_hop_duration_seconds_bucket{le=\"0.01\",target=\"agent.example\"} 1",
2198            "harn_worker_queue_depth{queue=\"triage\"} 1",
2199            "harn_worker_queue_claim_age_seconds_bucket{le=\"5\",queue=\"triage\"} 1",
2200            "harn_orchestrator_pump_backlog{topic=\"trigger.inbox.envelopes\"} 2",
2201            "harn_orchestrator_pump_outstanding{topic=\"trigger.inbox.envelopes\"} 1",
2202            "harn_orchestrator_pump_admission_delay_seconds_bucket{le=\"0.05\",topic=\"trigger.inbox.envelopes\"} 1",
2203            "harn_trigger_webhook_accepted_to_normalized_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.025\",provider=\"github\",status=\"normalized\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2204            "harn_trigger_webhook_accepted_to_queue_append_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.05\",provider=\"github\",status=\"queued\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2205            "harn_trigger_queue_age_at_dispatch_admission_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.1\",provider=\"github\",status=\"admitted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2206            "harn_trigger_queue_age_at_dispatch_start_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"started\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2207            "harn_trigger_dispatch_runtime_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"succeeded\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2208            "harn_trigger_retry_delay_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"2.5\",provider=\"github\",status=\"scheduled\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2209            "harn_trigger_accepted_to_dlq_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"60\",provider=\"github\",status=\"retry_exhausted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2210            "harn_trigger_oldest_pending_age_seconds{binding_key=\"github-new-issue@v7\",provider=\"github\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 3",
2211            "harn_llm_calls_total{model=\"mock\",outcome=\"succeeded\",provider=\"mock\"} 1",
2212            "harn_llm_cost_usd_total{model=\"mock\",provider=\"mock\"} 0.01",
2213            "harn_llm_cache_hits_total{provider=\"mock\"} 1",
2214        ] {
2215            assert!(rendered.contains(needle), "missing {needle}\n{rendered}");
2216        }
2217    }
2218}