Skip to main content

harn_vm/connectors/
mod.rs

1//! Connector traits and shared helpers for inbound event-source providers.
2//!
3//! This lands in `harn-vm` for now because the current dependency surface
4//! (`EventLog`, `SecretProvider`, `TriggerEvent`) already lives here. If the
5//! connector ecosystem grows enough to justify extraction, the module can be
6//! split into a dedicated crate later without changing the high-level contract.
7
8use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex, OnceLock};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25    registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26    ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod a2a_push;
30pub mod cron;
31pub mod effect_policy;
32pub mod harn_module;
33pub mod hmac;
34pub mod shared;
35pub mod stream;
36#[cfg(test)]
37pub(crate) mod test_util;
38pub mod testkit;
39pub mod webhook;
40
41pub use a2a_push::A2aPushConnector;
42pub use cron::{CatchupMode, CronConnector};
43pub use effect_policy::{
44    connector_export_denied_builtin_reason, connector_export_effect_class,
45    default_connector_export_policy, ConnectorExportEffectClass, HarnConnectorEffectPolicies,
46};
47pub use harn_module::{
48    load_contract as load_harn_connector_contract, HarnConnector, HarnConnectorContract,
49};
50pub use hmac::{
51    verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
52    DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
53    DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
54    DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
55    DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
56    DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
57    SIGNATURE_VERIFY_AUDIT_TOPIC,
58};
59pub use shared::{
60    paginate_cursor, resolve_jwks, verify_hmac_signature, verify_jwt_claims, verify_jwt_json,
61    ConnectorBase, CursorPage, HmacSignatureAlgorithm, JwtKeySource, JwtVerificationOptions,
62};
63pub use stream::StreamConnector;
64use webhook::WebhookProviderProfile;
65pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
66
67const OUTBOUND_CONNECTOR_HTTP_TIMEOUT: StdDuration = StdDuration::from_secs(30);
68
69pub(crate) fn outbound_http_client(user_agent: &'static str) -> reqwest::Client {
70    reqwest::Client::builder()
71        .user_agent(user_agent)
72        .timeout(OUTBOUND_CONNECTOR_HTTP_TIMEOUT)
73        .redirect(reqwest::redirect::Policy::custom(|attempt| {
74            if attempt.previous().len() >= 10 {
75                attempt.error("too many redirects")
76            } else if crate::egress::redirect_url_allowed(
77                "connector_redirect",
78                attempt.url().as_str(),
79            ) {
80                attempt.follow()
81            } else {
82                attempt.error("egress policy blocked redirect target")
83            }
84        }))
85        .build()
86        .expect("connector HTTP client configuration should be valid")
87}
88
89/// Shared owned handle to a connector instance registered with the runtime.
90pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
91
92thread_local! {
93    static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
94        RefCell::new(BTreeMap::new());
95}
96
97/// Provider implementation contract for inbound connectors.
98#[async_trait]
99pub trait Connector: Send + Sync {
100    /// Stable provider id such as `github`, `slack`, or `webhook`.
101    fn provider_id(&self) -> &ProviderId;
102
103    /// Trigger kinds this connector supports (`webhook`, `poll`, `stream`, ...).
104    fn kinds(&self) -> &[TriggerKind];
105
106    /// Called once per connector instance at orchestrator startup.
107    async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
108
109    /// Activate the bindings relevant to this connector instance.
110    async fn activate(
111        &self,
112        bindings: &[TriggerBinding],
113    ) -> Result<ActivationHandle, ConnectorError>;
114
115    /// Stop connector-owned background work and flush any connector-local state.
116    async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
117        Ok(())
118    }
119
120    /// Verify + normalize a provider-native inbound request into `TriggerEvent`.
121    async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
122
123    /// Verify + normalize a provider-native inbound request into the richer
124    /// connector result contract used by ack-first webhook adapters.
125    async fn normalize_inbound_result(
126        &self,
127        raw: RawInbound,
128    ) -> Result<ConnectorNormalizeResult, ConnectorError> {
129        self.normalize_inbound(raw)
130            .await
131            .map(ConnectorNormalizeResult::event)
132    }
133
134    /// Payload schema surfaced to future trigger-type narrowing.
135    fn payload_schema(&self) -> ProviderPayloadSchema;
136
137    /// Outbound API wrapper exposed to handlers.
138    fn client(&self) -> Arc<dyn ConnectorClient>;
139}
140
141/// Provider-supplied HTTP response returned before or instead of trigger dispatch.
142#[derive(Clone, Debug, PartialEq)]
143pub struct ConnectorHttpResponse {
144    pub status: u16,
145    pub headers: BTreeMap<String, String>,
146    pub body: JsonValue,
147}
148
149impl ConnectorHttpResponse {
150    pub fn new(status: u16, headers: BTreeMap<String, String>, body: JsonValue) -> Self {
151        Self {
152            status,
153            headers,
154            body,
155        }
156    }
157}
158
159/// Normalized inbound result accepted by the runtime connector adapter.
160#[derive(Clone, Debug, PartialEq)]
161pub enum ConnectorNormalizeResult {
162    Event(Box<TriggerEvent>),
163    Batch(Vec<TriggerEvent>),
164    ImmediateResponse {
165        response: ConnectorHttpResponse,
166        events: Vec<TriggerEvent>,
167    },
168    Reject(ConnectorHttpResponse),
169}
170
171impl ConnectorNormalizeResult {
172    pub fn event(event: TriggerEvent) -> Self {
173        Self::Event(Box::new(event))
174    }
175
176    pub fn into_events(self) -> Vec<TriggerEvent> {
177        match self {
178            Self::Event(event) => vec![*event],
179            Self::Batch(events) | Self::ImmediateResponse { events, .. } => events,
180            Self::Reject(_) => Vec::new(),
181        }
182    }
183}
184
185#[derive(Clone, Debug, PartialEq)]
186pub enum PostNormalizeOutcome {
187    Ready(Box<TriggerEvent>),
188    DuplicateDropped,
189}
190
191pub async fn postprocess_normalized_event(
192    inbox: &InboxIndex,
193    binding_id: &str,
194    dedupe_enabled: bool,
195    dedupe_ttl: StdDuration,
196    mut event: TriggerEvent,
197) -> Result<PostNormalizeOutcome, ConnectorError> {
198    if dedupe_enabled && !event.dedupe_claimed() {
199        if !inbox
200            .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
201            .await?
202        {
203            return Ok(PostNormalizeOutcome::DuplicateDropped);
204        }
205        event.mark_dedupe_claimed();
206    }
207
208    Ok(PostNormalizeOutcome::Ready(Box::new(event)))
209}
210
211/// Outbound provider client interface used by connector-backed stdlib modules.
212#[async_trait]
213pub trait ConnectorClient: Send + Sync {
214    async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
215}
216
217/// Minimal outbound client errors shared by connector implementations.
218#[derive(Clone, Debug, PartialEq, Eq)]
219pub enum ClientError {
220    MethodNotFound(String),
221    InvalidArgs(String),
222    RateLimited(String),
223    Transport(String),
224    EgressBlocked(crate::egress::EgressBlocked),
225    Other(String),
226}
227
228impl fmt::Display for ClientError {
229    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230        match self {
231            Self::MethodNotFound(message)
232            | Self::InvalidArgs(message)
233            | Self::RateLimited(message)
234            | Self::Transport(message)
235            | Self::Other(message) => message.fmt(f),
236            Self::EgressBlocked(blocked) => blocked.fmt(f),
237        }
238    }
239}
240
241impl std::error::Error for ClientError {}
242
243/// Shared connector-layer errors.
244#[derive(Debug)]
245pub enum ConnectorError {
246    DuplicateProvider(String),
247    DuplicateDelivery(String),
248    UnknownProvider(String),
249    MissingHeader(String),
250    InvalidHeader {
251        name: String,
252        detail: String,
253    },
254    InvalidSignature(String),
255    TimestampOutOfWindow {
256        timestamp: OffsetDateTime,
257        now: OffsetDateTime,
258        window: time::Duration,
259    },
260    Json(String),
261    Secret(String),
262    EventLog(String),
263    HarnRuntime(String),
264    Client(ClientError),
265    Unsupported(String),
266    Activation(String),
267}
268
269impl ConnectorError {
270    pub fn invalid_signature(message: impl Into<String>) -> Self {
271        Self::InvalidSignature(message.into())
272    }
273}
274
275impl fmt::Display for ConnectorError {
276    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
277        match self {
278            Self::DuplicateProvider(provider) => {
279                write!(f, "connector provider `{provider}` is already registered")
280            }
281            Self::DuplicateDelivery(message) => message.fmt(f),
282            Self::UnknownProvider(provider) => {
283                write!(f, "connector provider `{provider}` is not registered")
284            }
285            Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
286            Self::InvalidHeader { name, detail } => {
287                write!(f, "invalid header `{name}`: {detail}")
288            }
289            Self::InvalidSignature(message)
290            | Self::Json(message)
291            | Self::Secret(message)
292            | Self::EventLog(message)
293            | Self::HarnRuntime(message)
294            | Self::Unsupported(message)
295            | Self::Activation(message) => message.fmt(f),
296            Self::TimestampOutOfWindow {
297                timestamp,
298                now,
299                window,
300            } => write!(
301                f,
302                "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
303            ),
304            Self::Client(error) => error.fmt(f),
305        }
306    }
307}
308
309impl std::error::Error for ConnectorError {}
310
311impl From<crate::event_log::LogError> for ConnectorError {
312    fn from(value: crate::event_log::LogError) -> Self {
313        Self::EventLog(value.to_string())
314    }
315}
316
317impl From<crate::secrets::SecretError> for ConnectorError {
318    fn from(value: crate::secrets::SecretError) -> Self {
319        Self::Secret(value.to_string())
320    }
321}
322
323impl From<serde_json::Error> for ConnectorError {
324    fn from(value: serde_json::Error) -> Self {
325        Self::Json(value.to_string())
326    }
327}
328
329impl From<ClientError> for ConnectorError {
330    fn from(value: ClientError) -> Self {
331        Self::Client(value)
332    }
333}
334
335/// Startup context shared with connector instances.
336#[derive(Clone)]
337pub struct ConnectorCtx {
338    pub event_log: Arc<AnyEventLog>,
339    pub secrets: Arc<dyn SecretProvider>,
340    pub inbox: Arc<InboxIndex>,
341    pub metrics: Arc<MetricsRegistry>,
342    pub rate_limiter: Arc<RateLimiterFactory>,
343}
344
345/// Snapshot of connector-local metrics surfaced for tests and diagnostics.
346#[derive(Clone, Debug, Default, PartialEq, Eq)]
347pub struct ConnectorMetricsSnapshot {
348    pub inbox_claims_written: u64,
349    pub inbox_duplicates_rejected: u64,
350    pub inbox_fast_path_hits: u64,
351    pub inbox_durable_hits: u64,
352    pub inbox_expired_entries: u64,
353    pub inbox_active_entries: u64,
354    pub linear_timestamp_rejections_total: u64,
355    pub dispatch_succeeded_total: u64,
356    pub dispatch_failed_total: u64,
357    pub retry_scheduled_total: u64,
358    pub slack_delivery_success_total: u64,
359    pub slack_delivery_failure_total: u64,
360}
361
362type MetricLabels = BTreeMap<String, String>;
363
364#[derive(Clone, Debug, Default, PartialEq)]
365struct HistogramMetric {
366    buckets: BTreeMap<String, u64>,
367    count: u64,
368    sum: f64,
369}
370
371static ACTIVE_METRICS_REGISTRY: OnceLock<Mutex<Option<Arc<MetricsRegistry>>>> = OnceLock::new();
372
373pub fn install_active_metrics_registry(metrics: Arc<MetricsRegistry>) {
374    let slot = ACTIVE_METRICS_REGISTRY.get_or_init(|| Mutex::new(None));
375    *slot.lock().expect("active metrics registry poisoned") = Some(metrics);
376}
377
378pub fn clear_active_metrics_registry() {
379    if let Some(slot) = ACTIVE_METRICS_REGISTRY.get() {
380        *slot.lock().expect("active metrics registry poisoned") = None;
381    }
382}
383
384pub fn active_metrics_registry() -> Option<Arc<MetricsRegistry>> {
385    ACTIVE_METRICS_REGISTRY.get().and_then(|slot| {
386        slot.lock()
387            .expect("active metrics registry poisoned")
388            .clone()
389    })
390}
391
392/// Shared metrics surface for connector-local counters and timings.
393#[derive(Debug, Default)]
394pub struct MetricsRegistry {
395    inbox_claims_written: AtomicU64,
396    inbox_duplicates_rejected: AtomicU64,
397    inbox_fast_path_hits: AtomicU64,
398    inbox_durable_hits: AtomicU64,
399    inbox_expired_entries: AtomicU64,
400    inbox_active_entries: AtomicU64,
401    linear_timestamp_rejections_total: AtomicU64,
402    dispatch_succeeded_total: AtomicU64,
403    dispatch_failed_total: AtomicU64,
404    retry_scheduled_total: AtomicU64,
405    slack_delivery_success_total: AtomicU64,
406    slack_delivery_failure_total: AtomicU64,
407    custom_counters: Mutex<BTreeMap<String, u64>>,
408    counters: Mutex<BTreeMap<(String, MetricLabels), f64>>,
409    gauges: Mutex<BTreeMap<(String, MetricLabels), f64>>,
410    histograms: Mutex<BTreeMap<(String, MetricLabels), HistogramMetric>>,
411    pending_trigger_events: Mutex<BTreeMap<MetricLabels, BTreeMap<String, i64>>>,
412}
413
414impl MetricsRegistry {
415    const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0];
416    const TRIGGER_LATENCY_BUCKETS: [f64; 15] = [
417        0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0,
418    ];
419    const SIZE_BUCKETS: [f64; 9] = [
420        128.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0, 10485760.0,
421    ];
422
423    pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
424        ConnectorMetricsSnapshot {
425            inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
426            inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
427            inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
428            inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
429            inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
430            inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
431            linear_timestamp_rejections_total: self
432                .linear_timestamp_rejections_total
433                .load(Ordering::Relaxed),
434            dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
435            dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
436            retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
437            slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
438            slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
439        }
440    }
441
442    pub(crate) fn record_inbox_claim(&self) {
443        self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
444    }
445
446    pub(crate) fn record_inbox_duplicate_fast_path(&self) {
447        self.inbox_duplicates_rejected
448            .fetch_add(1, Ordering::Relaxed);
449        self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
450    }
451
452    pub(crate) fn record_inbox_duplicate_durable(&self) {
453        self.inbox_duplicates_rejected
454            .fetch_add(1, Ordering::Relaxed);
455        self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
456    }
457
458    pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
459        if count > 0 {
460            self.inbox_expired_entries
461                .fetch_add(count, Ordering::Relaxed);
462        }
463    }
464
465    pub(crate) fn set_inbox_active_entries(&self, count: usize) {
466        self.inbox_active_entries
467            .store(count as u64, Ordering::Relaxed);
468    }
469
470    pub fn record_linear_timestamp_rejection(&self) {
471        self.linear_timestamp_rejections_total
472            .fetch_add(1, Ordering::Relaxed);
473    }
474
475    pub fn record_dispatch_succeeded(&self) {
476        self.dispatch_succeeded_total
477            .fetch_add(1, Ordering::Relaxed);
478    }
479
480    pub fn record_dispatch_failed(&self) {
481        self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
482    }
483
484    pub fn record_retry_scheduled(&self) {
485        self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
486    }
487
488    pub fn record_slack_delivery_success(&self) {
489        self.slack_delivery_success_total
490            .fetch_add(1, Ordering::Relaxed);
491    }
492
493    pub fn record_slack_delivery_failure(&self) {
494        self.slack_delivery_failure_total
495            .fetch_add(1, Ordering::Relaxed);
496    }
497
498    pub fn record_custom_counter(&self, name: &str, amount: u64) {
499        if amount == 0 {
500            return;
501        }
502        let mut counters = self
503            .custom_counters
504            .lock()
505            .expect("custom counters poisoned");
506        *counters.entry(name.to_string()).or_default() += amount;
507    }
508
509    pub fn record_http_request(
510        &self,
511        endpoint: &str,
512        method: &str,
513        status: u16,
514        duration: StdDuration,
515        body_size_bytes: usize,
516    ) {
517        self.increment_counter(
518            "harn_http_requests_total",
519            labels([
520                ("endpoint", endpoint),
521                ("method", method),
522                ("status", &status.to_string()),
523            ]),
524            1,
525        );
526        self.observe_histogram(
527            "harn_http_request_duration_seconds",
528            labels([("endpoint", endpoint)]),
529            duration.as_secs_f64(),
530            &Self::DURATION_BUCKETS,
531        );
532        self.observe_histogram(
533            "harn_http_body_size_bytes",
534            labels([("endpoint", endpoint)]),
535            body_size_bytes as f64,
536            &Self::SIZE_BUCKETS,
537        );
538    }
539
540    pub fn record_trigger_received(&self, trigger_id: &str, provider: &str) {
541        self.increment_counter(
542            "harn_trigger_received_total",
543            labels([("trigger_id", trigger_id), ("provider", provider)]),
544            1,
545        );
546    }
547
548    pub fn record_trigger_deduped(&self, trigger_id: &str, reason: &str) {
549        self.increment_counter(
550            "harn_trigger_deduped_total",
551            labels([("trigger_id", trigger_id), ("reason", reason)]),
552            1,
553        );
554    }
555
556    pub fn record_trigger_predicate_evaluation(
557        &self,
558        trigger_id: &str,
559        result: bool,
560        cost_usd: f64,
561    ) {
562        self.increment_counter(
563            "harn_trigger_predicate_evaluations_total",
564            labels([
565                ("trigger_id", trigger_id),
566                ("result", if result { "true" } else { "false" }),
567            ]),
568            1,
569        );
570        self.observe_histogram(
571            "harn_trigger_predicate_cost_usd",
572            labels([("trigger_id", trigger_id)]),
573            cost_usd.max(0.0),
574            &[0.0, 0.001, 0.01, 0.05, 0.1, 1.0],
575        );
576    }
577
578    pub fn record_trigger_dispatched(&self, trigger_id: &str, handler_kind: &str, outcome: &str) {
579        self.increment_counter(
580            "harn_trigger_dispatched_total",
581            labels([
582                ("trigger_id", trigger_id),
583                ("handler_kind", handler_kind),
584                ("outcome", outcome),
585            ]),
586            1,
587        );
588    }
589
590    pub fn record_trigger_retry(&self, trigger_id: &str, attempt: u32) {
591        self.increment_counter(
592            "harn_trigger_retries_total",
593            labels([
594                ("trigger_id", trigger_id),
595                ("attempt", &attempt.to_string()),
596            ]),
597            1,
598        );
599    }
600
601    pub fn record_trigger_dlq(&self, trigger_id: &str, reason: &str) {
602        self.increment_counter(
603            "harn_trigger_dlq_total",
604            labels([("trigger_id", trigger_id), ("reason", reason)]),
605            1,
606        );
607    }
608
609    pub fn record_trigger_accepted_to_normalized(
610        &self,
611        trigger_id: &str,
612        binding_key: &str,
613        provider: &str,
614        tenant_id: Option<&str>,
615        status: &str,
616        duration: StdDuration,
617    ) {
618        self.observe_histogram(
619            "harn_trigger_webhook_accepted_to_normalized_seconds",
620            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
621            duration.as_secs_f64(),
622            &Self::TRIGGER_LATENCY_BUCKETS,
623        );
624    }
625
626    pub fn record_trigger_accepted_to_queue_append(
627        &self,
628        trigger_id: &str,
629        binding_key: &str,
630        provider: &str,
631        tenant_id: Option<&str>,
632        status: &str,
633        duration: StdDuration,
634    ) {
635        self.observe_histogram(
636            "harn_trigger_webhook_accepted_to_queue_append_seconds",
637            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
638            duration.as_secs_f64(),
639            &Self::TRIGGER_LATENCY_BUCKETS,
640        );
641    }
642
643    pub fn record_trigger_queue_age_at_dispatch_admission(
644        &self,
645        trigger_id: &str,
646        binding_key: &str,
647        provider: &str,
648        tenant_id: Option<&str>,
649        status: &str,
650        age: StdDuration,
651    ) {
652        self.observe_histogram(
653            "harn_trigger_queue_age_at_dispatch_admission_seconds",
654            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
655            age.as_secs_f64(),
656            &Self::TRIGGER_LATENCY_BUCKETS,
657        );
658    }
659
660    pub fn record_trigger_queue_age_at_dispatch_start(
661        &self,
662        trigger_id: &str,
663        binding_key: &str,
664        provider: &str,
665        tenant_id: Option<&str>,
666        status: &str,
667        age: StdDuration,
668    ) {
669        self.observe_histogram(
670            "harn_trigger_queue_age_at_dispatch_start_seconds",
671            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
672            age.as_secs_f64(),
673            &Self::TRIGGER_LATENCY_BUCKETS,
674        );
675    }
676
677    pub fn record_trigger_dispatch_runtime(
678        &self,
679        trigger_id: &str,
680        binding_key: &str,
681        provider: &str,
682        tenant_id: Option<&str>,
683        status: &str,
684        duration: StdDuration,
685    ) {
686        self.observe_histogram(
687            "harn_trigger_dispatch_runtime_seconds",
688            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
689            duration.as_secs_f64(),
690            &Self::TRIGGER_LATENCY_BUCKETS,
691        );
692    }
693
694    pub fn record_trigger_retry_delay(
695        &self,
696        trigger_id: &str,
697        binding_key: &str,
698        provider: &str,
699        tenant_id: Option<&str>,
700        status: &str,
701        duration: StdDuration,
702    ) {
703        self.observe_histogram(
704            "harn_trigger_retry_delay_seconds",
705            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
706            duration.as_secs_f64(),
707            &Self::TRIGGER_LATENCY_BUCKETS,
708        );
709    }
710
711    pub fn record_trigger_accepted_to_dlq(
712        &self,
713        trigger_id: &str,
714        binding_key: &str,
715        provider: &str,
716        tenant_id: Option<&str>,
717        status: &str,
718        duration: StdDuration,
719    ) {
720        self.observe_histogram(
721            "harn_trigger_accepted_to_dlq_seconds",
722            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
723            duration.as_secs_f64(),
724            &Self::TRIGGER_LATENCY_BUCKETS,
725        );
726    }
727
728    pub fn note_trigger_pending_event(
729        &self,
730        event_id: &str,
731        trigger_id: &str,
732        binding_key: &str,
733        provider: &str,
734        tenant_id: Option<&str>,
735        accepted_at_ms: i64,
736        now_ms: i64,
737    ) {
738        let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
739        {
740            let mut pending = self
741                .pending_trigger_events
742                .lock()
743                .expect("pending trigger events poisoned");
744            pending
745                .entry(labels.clone())
746                .or_default()
747                .insert(event_id.to_string(), accepted_at_ms);
748        }
749        self.refresh_oldest_pending_gauge(labels, now_ms);
750    }
751
752    pub fn clear_trigger_pending_event(
753        &self,
754        event_id: &str,
755        trigger_id: &str,
756        binding_key: &str,
757        provider: &str,
758        tenant_id: Option<&str>,
759        now_ms: i64,
760    ) {
761        let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
762        {
763            let mut pending = self
764                .pending_trigger_events
765                .lock()
766                .expect("pending trigger events poisoned");
767            if let Some(events) = pending.get_mut(&labels) {
768                events.remove(event_id);
769                if events.is_empty() {
770                    pending.remove(&labels);
771                }
772            }
773        }
774        self.refresh_oldest_pending_gauge(labels, now_ms);
775    }
776
777    pub fn set_trigger_inflight(&self, trigger_id: &str, count: u64) {
778        self.set_gauge(
779            "harn_trigger_inflight",
780            labels([("trigger_id", trigger_id)]),
781            count as f64,
782        );
783    }
784
785    pub fn set_trigger_budget_cost_today(&self, trigger_id: &str, cost_usd: f64) {
786        self.set_gauge(
787            "harn_trigger_budget_cost_today_usd",
788            labels([("trigger_id", trigger_id)]),
789            cost_usd.max(0.0),
790        );
791    }
792
793    pub fn record_trigger_budget_exhausted(&self, trigger_id: &str, strategy: &str) {
794        self.increment_counter(
795            "harn_trigger_budget_exhausted_total",
796            labels([("trigger_id", trigger_id), ("strategy", strategy)]),
797            1,
798        );
799    }
800
801    pub fn record_backpressure_event(&self, dimension: &str, action: &str) {
802        self.increment_counter(
803            "harn_backpressure_events_total",
804            labels([("dimension", dimension), ("action", action)]),
805            1,
806        );
807    }
808
809    pub fn record_event_log_append(
810        &self,
811        topic: &str,
812        duration: StdDuration,
813        payload_bytes: usize,
814    ) {
815        self.observe_histogram(
816            "harn_event_log_append_duration_seconds",
817            labels([("topic", topic)]),
818            duration.as_secs_f64(),
819            &Self::DURATION_BUCKETS,
820        );
821        self.set_gauge(
822            "harn_event_log_topic_size_bytes",
823            labels([("topic", topic)]),
824            payload_bytes as f64,
825        );
826    }
827
828    pub fn set_event_log_consumer_lag(&self, topic: &str, consumer: &str, lag: u64) {
829        self.set_gauge(
830            "harn_event_log_consumer_lag",
831            labels([("topic", topic), ("consumer", consumer)]),
832            lag as f64,
833        );
834    }
835
836    pub fn record_a2a_hop(&self, target: &str, outcome: &str, duration: StdDuration) {
837        self.increment_counter(
838            "harn_a2a_hops_total",
839            labels([("target", target), ("outcome", outcome)]),
840            1,
841        );
842        self.observe_histogram(
843            "harn_a2a_hop_duration_seconds",
844            labels([("target", target)]),
845            duration.as_secs_f64(),
846            &Self::DURATION_BUCKETS,
847        );
848    }
849
850    pub fn set_worker_queue_depth(&self, queue: &str, depth: u64) {
851        self.set_gauge(
852            "harn_worker_queue_depth",
853            labels([("queue", queue)]),
854            depth as f64,
855        );
856    }
857
858    pub fn record_worker_queue_claim_age(&self, queue: &str, age_seconds: f64) {
859        self.observe_histogram(
860            "harn_worker_queue_claim_age_seconds",
861            labels([("queue", queue)]),
862            age_seconds.max(0.0),
863            &Self::DURATION_BUCKETS,
864        );
865    }
866
867    /// Increment the scheduler-selection counter for a particular fairness key.
868    pub fn record_scheduler_selection(
869        &self,
870        queue: &str,
871        fairness_dimension: &str,
872        fairness_key: &str,
873    ) {
874        self.increment_counter(
875            "harn_scheduler_selections_total",
876            labels([
877                ("queue", queue),
878                ("fairness_dimension", fairness_dimension),
879                ("fairness_key", fairness_key),
880            ]),
881            1,
882        );
883    }
884
885    /// Increment the scheduler-deferred counter (queue had work but couldn't
886    /// be selected because the key was at its concurrency cap).
887    pub fn record_scheduler_deferral(
888        &self,
889        queue: &str,
890        fairness_dimension: &str,
891        fairness_key: &str,
892    ) {
893        self.increment_counter(
894            "harn_scheduler_deferrals_total",
895            labels([
896                ("queue", queue),
897                ("fairness_dimension", fairness_dimension),
898                ("fairness_key", fairness_key),
899            ]),
900            1,
901        );
902    }
903
904    /// Increment the scheduler starvation-promotion counter.
905    pub fn record_scheduler_starvation_promotion(
906        &self,
907        queue: &str,
908        fairness_dimension: &str,
909        fairness_key: &str,
910    ) {
911        self.increment_counter(
912            "harn_scheduler_starvation_promotions_total",
913            labels([
914                ("queue", queue),
915                ("fairness_dimension", fairness_dimension),
916                ("fairness_key", fairness_key),
917            ]),
918            1,
919        );
920    }
921
922    /// Set the current scheduler deficit gauge for a fairness key.
923    pub fn set_scheduler_deficit(
924        &self,
925        queue: &str,
926        fairness_dimension: &str,
927        fairness_key: &str,
928        deficit: i64,
929    ) {
930        self.set_gauge(
931            "harn_scheduler_deficit",
932            labels([
933                ("queue", queue),
934                ("fairness_dimension", fairness_dimension),
935                ("fairness_key", fairness_key),
936            ]),
937            deficit as f64,
938        );
939    }
940
941    /// Set the oldest-eligible-job-age gauge for a fairness key (seconds).
942    pub fn set_scheduler_oldest_eligible_age(
943        &self,
944        queue: &str,
945        fairness_dimension: &str,
946        fairness_key: &str,
947        age_ms: u64,
948    ) {
949        self.set_gauge(
950            "harn_scheduler_oldest_eligible_age_seconds",
951            labels([
952                ("queue", queue),
953                ("fairness_dimension", fairness_dimension),
954                ("fairness_key", fairness_key),
955            ]),
956            age_ms as f64 / 1000.0,
957        );
958    }
959
960    pub fn set_orchestrator_pump_backlog(&self, topic: &str, count: u64) {
961        self.set_gauge(
962            "harn_orchestrator_pump_backlog",
963            labels([("topic", topic)]),
964            count as f64,
965        );
966    }
967
968    pub fn set_orchestrator_pump_outstanding(&self, topic: &str, count: usize) {
969        self.set_gauge(
970            "harn_orchestrator_pump_outstanding",
971            labels([("topic", topic)]),
972            count as f64,
973        );
974    }
975
976    pub fn record_orchestrator_pump_admission_delay(&self, topic: &str, duration: StdDuration) {
977        self.observe_histogram(
978            "harn_orchestrator_pump_admission_delay_seconds",
979            labels([("topic", topic)]),
980            duration.as_secs_f64(),
981            &Self::DURATION_BUCKETS,
982        );
983    }
984
985    pub fn record_llm_call(&self, provider: &str, model: &str, outcome: &str, cost_usd: f64) {
986        self.increment_counter(
987            "harn_llm_calls_total",
988            labels([
989                ("provider", provider),
990                ("model", model),
991                ("outcome", outcome),
992            ]),
993            1,
994        );
995        if cost_usd > 0.0 {
996            self.increment_counter(
997                "harn_llm_cost_usd_total",
998                labels([("provider", provider), ("model", model)]),
999                cost_usd,
1000            );
1001        } else {
1002            self.ensure_counter(
1003                "harn_llm_cost_usd_total",
1004                labels([("provider", provider), ("model", model)]),
1005            );
1006        }
1007    }
1008
1009    pub fn record_llm_cache_hit(&self, provider: &str) {
1010        self.increment_counter(
1011            "harn_llm_cache_hits_total",
1012            labels([("provider", provider)]),
1013            1,
1014        );
1015    }
1016
1017    pub fn render_prometheus(&self) -> String {
1018        let snapshot = self.snapshot();
1019        let counters = [
1020            (
1021                "connector_linear_timestamp_rejections_total",
1022                snapshot.linear_timestamp_rejections_total,
1023            ),
1024            (
1025                "dispatch_succeeded_total",
1026                snapshot.dispatch_succeeded_total,
1027            ),
1028            ("dispatch_failed_total", snapshot.dispatch_failed_total),
1029            ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
1030            ("retry_scheduled_total", snapshot.retry_scheduled_total),
1031            (
1032                "slack_events_delivery_success_total",
1033                snapshot.slack_delivery_success_total,
1034            ),
1035            (
1036                "slack_events_delivery_failure_total",
1037                snapshot.slack_delivery_failure_total,
1038            ),
1039        ];
1040
1041        let mut rendered = String::new();
1042        for (name, value) in counters {
1043            rendered.push_str("# TYPE ");
1044            rendered.push_str(name);
1045            rendered.push_str(" counter\n");
1046            rendered.push_str(name);
1047            rendered.push(' ');
1048            rendered.push_str(&value.to_string());
1049            rendered.push('\n');
1050        }
1051        let custom_counters = self
1052            .custom_counters
1053            .lock()
1054            .expect("custom counters poisoned");
1055        for (name, value) in custom_counters.iter() {
1056            let metric_name = format!(
1057                "connector_custom_{}_total",
1058                name.chars()
1059                    .map(|ch| if ch.is_ascii_alphanumeric() || ch == '_' {
1060                        ch
1061                    } else {
1062                        '_'
1063                    })
1064                    .collect::<String>()
1065            );
1066            rendered.push_str("# TYPE ");
1067            rendered.push_str(&metric_name);
1068            rendered.push_str(" counter\n");
1069            rendered.push_str(&metric_name);
1070            rendered.push(' ');
1071            rendered.push_str(&value.to_string());
1072            rendered.push('\n');
1073        }
1074        rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
1075        rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
1076        rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
1077        rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
1078        self.render_generic_metrics(&mut rendered);
1079        rendered
1080    }
1081
1082    fn increment_counter(&self, name: &str, labels: MetricLabels, amount: impl Into<f64>) {
1083        let amount = amount.into();
1084        if amount <= 0.0 || !amount.is_finite() {
1085            return;
1086        }
1087        let mut counters = self.counters.lock().expect("metrics counters poisoned");
1088        *counters.entry((name.to_string(), labels)).or_default() += amount;
1089    }
1090
1091    fn ensure_counter(&self, name: &str, labels: MetricLabels) {
1092        let mut counters = self.counters.lock().expect("metrics counters poisoned");
1093        counters.entry((name.to_string(), labels)).or_default();
1094    }
1095
1096    fn set_gauge(&self, name: &str, labels: MetricLabels, value: f64) {
1097        let mut gauges = self.gauges.lock().expect("metrics gauges poisoned");
1098        gauges.insert((name.to_string(), labels), value);
1099    }
1100
1101    fn observe_histogram(
1102        &self,
1103        name: &str,
1104        labels: MetricLabels,
1105        value: f64,
1106        bucket_bounds: &[f64],
1107    ) {
1108        if !value.is_finite() {
1109            return;
1110        }
1111        let mut histograms = self.histograms.lock().expect("metrics histograms poisoned");
1112        let histogram = histograms
1113            .entry((name.to_string(), labels))
1114            .or_insert_with(|| HistogramMetric {
1115                buckets: bucket_bounds
1116                    .iter()
1117                    .map(|bound| (prometheus_float(*bound), 0))
1118                    .chain(std::iter::once(("+Inf".to_string(), 0)))
1119                    .collect(),
1120                count: 0,
1121                sum: 0.0,
1122            });
1123        histogram.count += 1;
1124        histogram.sum += value;
1125        for bound in bucket_bounds {
1126            if value <= *bound {
1127                let key = prometheus_float(*bound);
1128                *histogram.buckets.entry(key).or_default() += 1;
1129            }
1130        }
1131        *histogram.buckets.entry("+Inf".to_string()).or_default() += 1;
1132    }
1133
1134    fn refresh_oldest_pending_gauge(&self, labels: MetricLabels, now_ms: i64) {
1135        let oldest_accepted_at_ms = self
1136            .pending_trigger_events
1137            .lock()
1138            .expect("pending trigger events poisoned")
1139            .get(&labels)
1140            .and_then(|events| events.values().min().copied());
1141        let age_seconds = oldest_accepted_at_ms
1142            .map(|accepted_at_ms| millis_delta(now_ms, accepted_at_ms).as_secs_f64())
1143            .unwrap_or(0.0);
1144        self.set_gauge(
1145            "harn_trigger_oldest_pending_age_seconds",
1146            labels,
1147            age_seconds,
1148        );
1149    }
1150
1151    fn render_generic_metrics(&self, rendered: &mut String) {
1152        let counters = self
1153            .counters
1154            .lock()
1155            .expect("metrics counters poisoned")
1156            .clone();
1157        let gauges = self.gauges.lock().expect("metrics gauges poisoned").clone();
1158        let histograms = self
1159            .histograms
1160            .lock()
1161            .expect("metrics histograms poisoned")
1162            .clone();
1163
1164        for name in metric_family_names(MetricKind::Counter) {
1165            rendered.push_str("# TYPE ");
1166            rendered.push_str(name);
1167            rendered.push_str(" counter\n");
1168            for ((sample_name, labels), value) in counters.iter().filter(|((n, _), _)| n == name) {
1169                render_sample(rendered, sample_name, labels, *value);
1170            }
1171        }
1172        for name in metric_family_names(MetricKind::Gauge) {
1173            rendered.push_str("# TYPE ");
1174            rendered.push_str(name);
1175            rendered.push_str(" gauge\n");
1176            for ((sample_name, labels), value) in gauges.iter().filter(|((n, _), _)| n == name) {
1177                render_sample(rendered, sample_name, labels, *value);
1178            }
1179        }
1180        for name in metric_family_names(MetricKind::Histogram) {
1181            rendered.push_str("# TYPE ");
1182            rendered.push_str(name);
1183            rendered.push_str(" histogram\n");
1184            for ((sample_name, labels), histogram) in
1185                histograms.iter().filter(|((n, _), _)| n == name)
1186            {
1187                for (le, value) in &histogram.buckets {
1188                    let mut bucket_labels = labels.clone();
1189                    bucket_labels.insert("le".to_string(), le.clone());
1190                    render_sample(
1191                        rendered,
1192                        &format!("{sample_name}_bucket"),
1193                        &bucket_labels,
1194                        *value as f64,
1195                    );
1196                }
1197                render_sample(
1198                    rendered,
1199                    &format!("{sample_name}_sum"),
1200                    labels,
1201                    histogram.sum,
1202                );
1203                render_sample(
1204                    rendered,
1205                    &format!("{sample_name}_count"),
1206                    labels,
1207                    histogram.count as f64,
1208                );
1209            }
1210        }
1211    }
1212}
1213
1214#[derive(Clone, Copy)]
1215enum MetricKind {
1216    Counter,
1217    Gauge,
1218    Histogram,
1219}
1220
1221fn metric_family_names(kind: MetricKind) -> &'static [&'static str] {
1222    match kind {
1223        MetricKind::Counter => &[
1224            "harn_http_requests_total",
1225            "harn_trigger_received_total",
1226            "harn_trigger_deduped_total",
1227            "harn_trigger_predicate_evaluations_total",
1228            "harn_trigger_dispatched_total",
1229            "harn_trigger_retries_total",
1230            "harn_trigger_dlq_total",
1231            "harn_trigger_budget_exhausted_total",
1232            "harn_backpressure_events_total",
1233            "harn_a2a_hops_total",
1234            "harn_llm_calls_total",
1235            "harn_llm_cost_usd_total",
1236            "harn_llm_cache_hits_total",
1237            "harn_scheduler_selections_total",
1238            "harn_scheduler_deferrals_total",
1239            "harn_scheduler_starvation_promotions_total",
1240        ],
1241        MetricKind::Gauge => &[
1242            "harn_trigger_inflight",
1243            "harn_event_log_topic_size_bytes",
1244            "harn_event_log_consumer_lag",
1245            "harn_trigger_budget_cost_today_usd",
1246            "harn_worker_queue_depth",
1247            "harn_orchestrator_pump_backlog",
1248            "harn_orchestrator_pump_outstanding",
1249            "harn_trigger_oldest_pending_age_seconds",
1250            "harn_scheduler_deficit",
1251            "harn_scheduler_oldest_eligible_age_seconds",
1252        ],
1253        MetricKind::Histogram => &[
1254            "harn_http_request_duration_seconds",
1255            "harn_http_body_size_bytes",
1256            "harn_trigger_predicate_cost_usd",
1257            "harn_event_log_append_duration_seconds",
1258            "harn_a2a_hop_duration_seconds",
1259            "harn_worker_queue_claim_age_seconds",
1260            "harn_orchestrator_pump_admission_delay_seconds",
1261            "harn_trigger_webhook_accepted_to_normalized_seconds",
1262            "harn_trigger_webhook_accepted_to_queue_append_seconds",
1263            "harn_trigger_queue_age_at_dispatch_admission_seconds",
1264            "harn_trigger_queue_age_at_dispatch_start_seconds",
1265            "harn_trigger_dispatch_runtime_seconds",
1266            "harn_trigger_retry_delay_seconds",
1267            "harn_trigger_accepted_to_dlq_seconds",
1268        ],
1269    }
1270}
1271
1272fn labels<const N: usize>(pairs: [(&str, &str); N]) -> MetricLabels {
1273    pairs
1274        .into_iter()
1275        .map(|(name, value)| (name.to_string(), value.to_string()))
1276        .collect()
1277}
1278
1279fn trigger_lifecycle_labels(
1280    trigger_id: &str,
1281    binding_key: &str,
1282    provider: &str,
1283    tenant_id: Option<&str>,
1284    status: &str,
1285) -> MetricLabels {
1286    labels([
1287        ("binding_key", binding_key),
1288        ("provider", provider),
1289        ("status", status),
1290        ("tenant_id", tenant_label(tenant_id)),
1291        ("trigger_id", trigger_id),
1292    ])
1293}
1294
1295fn trigger_pending_labels(
1296    trigger_id: &str,
1297    binding_key: &str,
1298    provider: &str,
1299    tenant_id: Option<&str>,
1300) -> MetricLabels {
1301    labels([
1302        ("binding_key", binding_key),
1303        ("provider", provider),
1304        ("tenant_id", tenant_label(tenant_id)),
1305        ("trigger_id", trigger_id),
1306    ])
1307}
1308
1309fn tenant_label(tenant_id: Option<&str>) -> &str {
1310    tenant_id
1311        .map(str::trim)
1312        .filter(|value| !value.is_empty())
1313        .unwrap_or("none")
1314}
1315
1316fn millis_delta(later_ms: i64, earlier_ms: i64) -> StdDuration {
1317    StdDuration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
1318}
1319
1320fn render_sample(rendered: &mut String, name: &str, labels: &MetricLabels, value: f64) {
1321    rendered.push_str(name);
1322    if !labels.is_empty() {
1323        rendered.push('{');
1324        for (index, (label, label_value)) in labels.iter().enumerate() {
1325            if index > 0 {
1326                rendered.push(',');
1327            }
1328            rendered.push_str(label);
1329            rendered.push_str("=\"");
1330            rendered.push_str(&escape_label_value(label_value));
1331            rendered.push('"');
1332        }
1333        rendered.push('}');
1334    }
1335    rendered.push(' ');
1336    rendered.push_str(&prometheus_float(value));
1337    rendered.push('\n');
1338}
1339
1340fn escape_label_value(value: &str) -> String {
1341    value
1342        .chars()
1343        .flat_map(|ch| match ch {
1344            '\\' => "\\\\".chars().collect::<Vec<_>>(),
1345            '"' => "\\\"".chars().collect::<Vec<_>>(),
1346            '\n' => "\\n".chars().collect::<Vec<_>>(),
1347            other => vec![other],
1348        })
1349        .collect()
1350}
1351
1352fn prometheus_float(value: f64) -> String {
1353    if value.is_infinite() && value.is_sign_positive() {
1354        return "+Inf".to_string();
1355    }
1356    if value.fract() == 0.0 {
1357        format!("{value:.0}")
1358    } else {
1359        let rendered = format!("{value:.6}");
1360        rendered
1361            .trim_end_matches('0')
1362            .trim_end_matches('.')
1363            .to_string()
1364    }
1365}
1366
1367/// Provider payload schema metadata exposed by a connector.
1368#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1369pub struct ProviderPayloadSchema {
1370    pub harn_schema_name: String,
1371    #[serde(default)]
1372    pub json_schema: JsonValue,
1373}
1374
1375impl ProviderPayloadSchema {
1376    pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
1377        Self {
1378            harn_schema_name: harn_schema_name.into(),
1379            json_schema,
1380        }
1381    }
1382
1383    pub fn named(harn_schema_name: impl Into<String>) -> Self {
1384        Self::new(harn_schema_name, JsonValue::Null)
1385    }
1386}
1387
1388impl Default for ProviderPayloadSchema {
1389    fn default() -> Self {
1390        Self::named("raw")
1391    }
1392}
1393
1394/// High-level transport kind a connector supports.
1395#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1396#[serde(transparent)]
1397pub struct TriggerKind(String);
1398
1399impl TriggerKind {
1400    pub fn new(value: impl Into<String>) -> Self {
1401        Self(value.into())
1402    }
1403
1404    pub fn as_str(&self) -> &str {
1405        self.0.as_str()
1406    }
1407}
1408
1409impl From<&str> for TriggerKind {
1410    fn from(value: &str) -> Self {
1411        Self::new(value)
1412    }
1413}
1414
1415impl From<String> for TriggerKind {
1416    fn from(value: String) -> Self {
1417        Self::new(value)
1418    }
1419}
1420
1421/// Future trigger manifest binding routed to a connector activation.
1422#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1423pub struct TriggerBinding {
1424    pub provider: ProviderId,
1425    pub kind: TriggerKind,
1426    pub binding_id: String,
1427    #[serde(default)]
1428    pub dedupe_key: Option<String>,
1429    #[serde(default = "default_dedupe_retention_days")]
1430    pub dedupe_retention_days: u32,
1431    #[serde(default)]
1432    pub config: JsonValue,
1433}
1434
1435impl TriggerBinding {
1436    pub fn new(
1437        provider: ProviderId,
1438        kind: impl Into<TriggerKind>,
1439        binding_id: impl Into<String>,
1440    ) -> Self {
1441        Self {
1442            provider,
1443            kind: kind.into(),
1444            binding_id: binding_id.into(),
1445            dedupe_key: None,
1446            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1447            config: JsonValue::Null,
1448        }
1449    }
1450}
1451
1452fn default_dedupe_retention_days() -> u32 {
1453    crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
1454}
1455
1456/// Small in-memory trigger-binding registry used to fan bindings into connectors.
1457#[derive(Clone, Debug, Default)]
1458pub struct TriggerRegistry {
1459    bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
1460}
1461
1462impl TriggerRegistry {
1463    pub fn register(&mut self, binding: TriggerBinding) {
1464        self.bindings
1465            .entry(binding.provider.clone())
1466            .or_default()
1467            .push(binding);
1468    }
1469
1470    pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
1471        &self.bindings
1472    }
1473
1474    pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
1475        self.bindings
1476            .get(provider)
1477            .map(Vec::as_slice)
1478            .unwrap_or(&[])
1479    }
1480}
1481
1482/// Metadata returned from connector activation.
1483#[derive(Clone, Debug, PartialEq, Eq)]
1484pub struct ActivationHandle {
1485    pub provider: ProviderId,
1486    pub binding_count: usize,
1487}
1488
1489impl ActivationHandle {
1490    pub fn new(provider: ProviderId, binding_count: usize) -> Self {
1491        Self {
1492            provider,
1493            binding_count,
1494        }
1495    }
1496}
1497
1498/// Provider-native inbound request payload preserved as raw bytes.
1499#[derive(Clone, Debug, PartialEq)]
1500pub struct RawInbound {
1501    pub kind: String,
1502    pub headers: BTreeMap<String, String>,
1503    pub query: BTreeMap<String, String>,
1504    pub body: Vec<u8>,
1505    pub received_at: OffsetDateTime,
1506    pub occurred_at: Option<OffsetDateTime>,
1507    pub tenant_id: Option<TenantId>,
1508    pub metadata: JsonValue,
1509}
1510
1511impl RawInbound {
1512    pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
1513        Self {
1514            kind: kind.into(),
1515            headers,
1516            query: BTreeMap::new(),
1517            body,
1518            received_at: clock::now_utc(),
1519            occurred_at: None,
1520            tenant_id: None,
1521            metadata: JsonValue::Null,
1522        }
1523    }
1524
1525    pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
1526        Ok(serde_json::from_slice(&self.body)?)
1527    }
1528}
1529
1530/// Token-bucket configuration shared across connector clients.
1531#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1532pub struct RateLimitConfig {
1533    pub capacity: u32,
1534    pub refill_tokens: u32,
1535    pub refill_interval: StdDuration,
1536}
1537
1538impl Default for RateLimitConfig {
1539    fn default() -> Self {
1540        Self {
1541            capacity: 60,
1542            refill_tokens: 1,
1543            refill_interval: StdDuration::from_secs(1),
1544        }
1545    }
1546}
1547
1548#[derive(Clone, Debug)]
1549struct TokenBucket {
1550    tokens: f64,
1551    last_refill: ClockInstant,
1552}
1553
1554impl TokenBucket {
1555    fn full(config: RateLimitConfig) -> Self {
1556        Self {
1557            tokens: config.capacity as f64,
1558            last_refill: clock::instant_now(),
1559        }
1560    }
1561
1562    fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
1563        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1564        let rate = config.refill_tokens.max(1) as f64 / interval;
1565        let elapsed = now.duration_since(self.last_refill).as_secs_f64();
1566        self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
1567        self.last_refill = now;
1568    }
1569
1570    fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
1571        self.refill(config, now);
1572        if self.tokens >= 1.0 {
1573            self.tokens -= 1.0;
1574            true
1575        } else {
1576            false
1577        }
1578    }
1579
1580    fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
1581        if self.tokens >= 1.0 {
1582            return StdDuration::ZERO;
1583        }
1584        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1585        let rate = config.refill_tokens.max(1) as f64 / interval;
1586        let missing = (1.0 - self.tokens).max(0.0);
1587        StdDuration::from_secs_f64((missing / rate).max(0.001))
1588    }
1589}
1590
1591/// Shared per-provider, per-key token bucket factory for outbound connector clients.
1592#[derive(Debug)]
1593pub struct RateLimiterFactory {
1594    config: RateLimitConfig,
1595    buckets: Mutex<HashMap<(String, String), TokenBucket>>,
1596}
1597
1598impl RateLimiterFactory {
1599    pub fn new(config: RateLimitConfig) -> Self {
1600        Self {
1601            config,
1602            buckets: Mutex::new(HashMap::new()),
1603        }
1604    }
1605
1606    pub fn config(&self) -> RateLimitConfig {
1607        self.config
1608    }
1609
1610    pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
1611        ScopedRateLimiter {
1612            factory: self,
1613            provider: provider.clone(),
1614            key: key.into(),
1615        }
1616    }
1617
1618    pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
1619        let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1620        let bucket = buckets
1621            .entry((provider.as_str().to_string(), key.to_string()))
1622            .or_insert_with(|| TokenBucket::full(self.config));
1623        bucket.try_acquire(self.config, clock::instant_now())
1624    }
1625
1626    pub async fn acquire(&self, provider: &ProviderId, key: &str) {
1627        loop {
1628            let wait = {
1629                let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1630                let bucket = buckets
1631                    .entry((provider.as_str().to_string(), key.to_string()))
1632                    .or_insert_with(|| TokenBucket::full(self.config));
1633                if bucket.try_acquire(self.config, clock::instant_now()) {
1634                    return;
1635                }
1636                bucket.wait_duration(self.config)
1637            };
1638            tokio::time::sleep(wait).await;
1639        }
1640    }
1641}
1642
1643impl Default for RateLimiterFactory {
1644    fn default() -> Self {
1645        Self::new(RateLimitConfig::default())
1646    }
1647}
1648
1649/// Borrowed view onto a single provider/key rate-limit scope.
1650#[derive(Clone, Debug)]
1651pub struct ScopedRateLimiter<'a> {
1652    factory: &'a RateLimiterFactory,
1653    provider: ProviderId,
1654    key: String,
1655}
1656
1657impl<'a> ScopedRateLimiter<'a> {
1658    pub fn try_acquire(&self) -> bool {
1659        self.factory.try_acquire(&self.provider, &self.key)
1660    }
1661
1662    pub async fn acquire(&self) {
1663        self.factory.acquire(&self.provider, &self.key).await;
1664    }
1665}
1666
1667/// Runtime connector registry keyed by provider id.
1668pub struct ConnectorRegistry {
1669    connectors: BTreeMap<ProviderId, ConnectorHandle>,
1670}
1671
1672impl ConnectorRegistry {
1673    pub fn empty() -> Self {
1674        Self {
1675            connectors: BTreeMap::new(),
1676        }
1677    }
1678
1679    pub fn with_defaults() -> Self {
1680        let mut registry = Self::empty();
1681        for provider in registered_provider_metadata() {
1682            if !matches!(provider.runtime, ProviderRuntimeMetadata::Builtin { .. }) {
1683                continue;
1684            }
1685            registry
1686                .register(default_connector_for_provider(&provider))
1687                .expect("default connector registration should not fail");
1688        }
1689        registry
1690    }
1691
1692    pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
1693        let provider = connector.provider_id().clone();
1694        if self.connectors.contains_key(&provider) {
1695            return Err(ConnectorError::DuplicateProvider(provider.0));
1696        }
1697        self.connectors
1698            .insert(provider, Arc::new(AsyncMutex::new(connector)));
1699        Ok(())
1700    }
1701
1702    pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
1703        self.connectors.get(id).cloned()
1704    }
1705
1706    pub fn remove(&mut self, id: &ProviderId) -> Option<ConnectorHandle> {
1707        self.connectors.remove(id)
1708    }
1709
1710    pub fn list(&self) -> Vec<ProviderId> {
1711        self.connectors.keys().cloned().collect()
1712    }
1713
1714    pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1715        for connector in self.connectors.values() {
1716            connector.lock().await.init(ctx.clone()).await?;
1717        }
1718        Ok(())
1719    }
1720
1721    pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
1722        let mut clients = BTreeMap::new();
1723        for (provider, connector) in &self.connectors {
1724            let client = connector.lock().await.client();
1725            clients.insert(provider.clone(), client);
1726        }
1727        clients
1728    }
1729
1730    pub async fn activate_all(
1731        &self,
1732        registry: &TriggerRegistry,
1733    ) -> Result<Vec<ActivationHandle>, ConnectorError> {
1734        let mut handles = Vec::new();
1735        for (provider, connector) in &self.connectors {
1736            let bindings = registry.bindings_for(provider);
1737            if bindings.is_empty() {
1738                continue;
1739            }
1740            let connector = connector.lock().await;
1741            handles.push(connector.activate(bindings).await?);
1742        }
1743        Ok(handles)
1744    }
1745}
1746
1747impl Default for ConnectorRegistry {
1748    fn default() -> Self {
1749        Self::with_defaults()
1750    }
1751}
1752
1753fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
1754    match &provider.runtime {
1755        ProviderRuntimeMetadata::Builtin {
1756            connector,
1757            default_signature_variant,
1758        } => match connector.as_str() {
1759            "a2a-push" => Box::new(A2aPushConnector::new()),
1760            "cron" => Box::new(CronConnector::new()),
1761            "stream" => Box::new(StreamConnector::new(
1762                ProviderId::from(provider.provider.clone()),
1763                provider.schema_name.clone(),
1764            )),
1765            "webhook" => {
1766                let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
1767                    .expect("catalog webhook signature variant must be valid");
1768                Box::new(GenericWebhookConnector::with_profile(
1769                    WebhookProviderProfile::new(
1770                        ProviderId::from(provider.provider.clone()),
1771                        provider.schema_name.clone(),
1772                        variant,
1773                    ),
1774                ))
1775            }
1776            _ => Box::new(PlaceholderConnector::from_metadata(provider)),
1777        },
1778        ProviderRuntimeMetadata::Placeholder => {
1779            Box::new(PlaceholderConnector::from_metadata(provider))
1780        }
1781    }
1782}
1783
1784struct PlaceholderConnector {
1785    provider_id: ProviderId,
1786    kinds: Vec<TriggerKind>,
1787    schema_name: String,
1788}
1789
1790impl PlaceholderConnector {
1791    fn from_metadata(metadata: &ProviderMetadata) -> Self {
1792        Self {
1793            provider_id: ProviderId::from(metadata.provider.clone()),
1794            kinds: metadata
1795                .kinds
1796                .iter()
1797                .cloned()
1798                .map(TriggerKind::from)
1799                .collect(),
1800            schema_name: metadata.schema_name.clone(),
1801        }
1802    }
1803}
1804
1805struct PlaceholderClient;
1806
1807#[async_trait]
1808impl ConnectorClient for PlaceholderClient {
1809    async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1810        Err(ClientError::Other(format!(
1811            "connector client method '{method}' is not implemented for this provider"
1812        )))
1813    }
1814}
1815
1816#[async_trait]
1817impl Connector for PlaceholderConnector {
1818    fn provider_id(&self) -> &ProviderId {
1819        &self.provider_id
1820    }
1821
1822    fn kinds(&self) -> &[TriggerKind] {
1823        &self.kinds
1824    }
1825
1826    async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1827        Ok(())
1828    }
1829
1830    async fn activate(
1831        &self,
1832        bindings: &[TriggerBinding],
1833    ) -> Result<ActivationHandle, ConnectorError> {
1834        Ok(ActivationHandle::new(
1835            self.provider_id.clone(),
1836            bindings.len(),
1837        ))
1838    }
1839
1840    async fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
1841        Err(ConnectorError::Unsupported(format!(
1842            "provider '{}' is cataloged but does not have a concrete inbound connector yet",
1843            self.provider_id.as_str()
1844        )))
1845    }
1846
1847    fn payload_schema(&self) -> ProviderPayloadSchema {
1848        ProviderPayloadSchema::named(self.schema_name.clone())
1849    }
1850
1851    fn client(&self) -> Arc<dyn ConnectorClient> {
1852        Arc::new(PlaceholderClient)
1853    }
1854}
1855
1856pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
1857    ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
1858        *slot.borrow_mut() = clients
1859            .into_iter()
1860            .map(|(provider, client)| (provider.as_str().to_string(), client))
1861            .collect();
1862    });
1863}
1864
1865pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
1866    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
1867}
1868
1869pub fn clear_active_connector_clients() {
1870    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
1871}
1872
1873#[cfg(test)]
1874mod tests {
1875    use super::*;
1876
1877    use std::sync::atomic::{AtomicUsize, Ordering};
1878
1879    use async_trait::async_trait;
1880    use serde_json::json;
1881
1882    struct NoopClient;
1883
1884    #[async_trait]
1885    impl ConnectorClient for NoopClient {
1886        async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1887            Ok(json!({ "method": method }))
1888        }
1889    }
1890
1891    struct FakeConnector {
1892        provider_id: ProviderId,
1893        kinds: Vec<TriggerKind>,
1894        activate_calls: Arc<AtomicUsize>,
1895    }
1896
1897    impl FakeConnector {
1898        fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
1899            Self {
1900                provider_id: ProviderId::from(provider_id),
1901                kinds: vec![TriggerKind::from("webhook")],
1902                activate_calls,
1903            }
1904        }
1905    }
1906
1907    #[async_trait]
1908    impl Connector for FakeConnector {
1909        fn provider_id(&self) -> &ProviderId {
1910            &self.provider_id
1911        }
1912
1913        fn kinds(&self) -> &[TriggerKind] {
1914            &self.kinds
1915        }
1916
1917        async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1918            Ok(())
1919        }
1920
1921        async fn activate(
1922            &self,
1923            bindings: &[TriggerBinding],
1924        ) -> Result<ActivationHandle, ConnectorError> {
1925            self.activate_calls.fetch_add(1, Ordering::SeqCst);
1926            Ok(ActivationHandle::new(
1927                self.provider_id.clone(),
1928                bindings.len(),
1929            ))
1930        }
1931
1932        async fn normalize_inbound(
1933            &self,
1934            _raw: RawInbound,
1935        ) -> Result<TriggerEvent, ConnectorError> {
1936            Err(ConnectorError::Unsupported(
1937                "not needed for registry tests".to_string(),
1938            ))
1939        }
1940
1941        fn payload_schema(&self) -> ProviderPayloadSchema {
1942            ProviderPayloadSchema::named("FakePayload")
1943        }
1944
1945        fn client(&self) -> Arc<dyn ConnectorClient> {
1946            Arc::new(NoopClient)
1947        }
1948    }
1949
1950    #[tokio::test]
1951    async fn connector_registry_rejects_duplicate_providers() {
1952        let activate_calls = Arc::new(AtomicUsize::new(0));
1953        let mut registry = ConnectorRegistry::empty();
1954        registry
1955            .register(Box::new(FakeConnector::new(
1956                "github",
1957                activate_calls.clone(),
1958            )))
1959            .unwrap();
1960
1961        let error = registry
1962            .register(Box::new(FakeConnector::new("github", activate_calls)))
1963            .unwrap_err();
1964        assert!(matches!(
1965            error,
1966            ConnectorError::DuplicateProvider(provider) if provider == "github"
1967        ));
1968    }
1969
1970    #[tokio::test]
1971    async fn connector_registry_activates_only_bound_connectors() {
1972        let github_calls = Arc::new(AtomicUsize::new(0));
1973        let slack_calls = Arc::new(AtomicUsize::new(0));
1974        let mut registry = ConnectorRegistry::empty();
1975        registry
1976            .register(Box::new(FakeConnector::new("github", github_calls.clone())))
1977            .unwrap();
1978        registry
1979            .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
1980            .unwrap();
1981
1982        let mut trigger_registry = TriggerRegistry::default();
1983        trigger_registry.register(TriggerBinding::new(
1984            ProviderId::from("github"),
1985            "webhook",
1986            "github.push",
1987        ));
1988        trigger_registry.register(TriggerBinding::new(
1989            ProviderId::from("github"),
1990            "webhook",
1991            "github.installation",
1992        ));
1993
1994        let handles = registry.activate_all(&trigger_registry).await.unwrap();
1995        assert_eq!(handles.len(), 1);
1996        assert_eq!(handles[0].provider.as_str(), "github");
1997        assert_eq!(handles[0].binding_count, 2);
1998        assert_eq!(github_calls.load(Ordering::SeqCst), 1);
1999        assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
2000    }
2001
2002    #[test]
2003    fn rate_limiter_scopes_tokens_by_provider_and_key() {
2004        let factory = RateLimiterFactory::new(RateLimitConfig {
2005            capacity: 1,
2006            refill_tokens: 1,
2007            refill_interval: StdDuration::from_secs(60),
2008        });
2009
2010        assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
2011        assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
2012        assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
2013        assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
2014    }
2015
2016    #[test]
2017    fn raw_inbound_json_body_preserves_raw_bytes() {
2018        let raw = RawInbound::new(
2019            "push",
2020            BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
2021            br#"{"ok":true}"#.to_vec(),
2022        );
2023
2024        assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
2025    }
2026
2027    #[test]
2028    fn connector_registry_lists_core_catalog_providers() {
2029        let registry = ConnectorRegistry::default();
2030        let providers = registry.list();
2031        assert!(providers.contains(&ProviderId::from("cron")));
2032        assert!(providers.contains(&ProviderId::from("webhook")));
2033        assert!(providers.contains(&ProviderId::from("kafka")));
2034        assert!(!providers.contains(&ProviderId::from("github")));
2035        assert!(!providers.contains(&ProviderId::from("slack")));
2036    }
2037
2038    #[test]
2039    fn pure_harn_pivot_only_keeps_core_builtin_connectors() {
2040        let core_runtime_providers = [
2041            "a2a-push",
2042            "cron",
2043            "email",
2044            "kafka",
2045            "nats",
2046            "postgres-cdc",
2047            "pulsar",
2048            "webhook",
2049            "websocket",
2050        ];
2051
2052        for provider in registered_provider_metadata() {
2053            if !matches!(provider.runtime, ProviderRuntimeMetadata::Builtin { .. }) {
2054                continue;
2055            }
2056
2057            let allowed_core = core_runtime_providers.contains(&provider.provider.as_str());
2058            assert!(
2059                allowed_core,
2060                "provider '{}' is registered as a Rust builtin connector; new service connectors \
2061                 must ship as pure-Harn packages and register with connector = {{ harn = \"...\" }}",
2062                provider.provider
2063            );
2064        }
2065    }
2066
2067    #[test]
2068    fn metrics_registry_exports_orchestrator_metric_families() {
2069        let metrics = MetricsRegistry::default();
2070        metrics.record_http_request(
2071            "/triggers/github",
2072            "POST",
2073            200,
2074            StdDuration::from_millis(25),
2075            512,
2076        );
2077        metrics.record_trigger_received("github-new-issue", "github");
2078        metrics.record_trigger_deduped("github-new-issue", "inbox_duplicate");
2079        metrics.record_trigger_predicate_evaluation("github-new-issue", true, 0.002);
2080        metrics.record_trigger_dispatched("github-new-issue", "local", "succeeded");
2081        metrics.record_trigger_retry("github-new-issue", 2);
2082        metrics.record_trigger_dlq("github-new-issue", "retry_exhausted");
2083        metrics.set_trigger_inflight("github-new-issue", 0);
2084        metrics.record_event_log_append(
2085            "orchestrator.triggers.pending",
2086            StdDuration::from_millis(1),
2087            2048,
2088        );
2089        metrics.set_event_log_consumer_lag("orchestrator.triggers.pending", "orchestrator-pump", 0);
2090        metrics.set_trigger_budget_cost_today("github-new-issue", 0.002);
2091        metrics.record_trigger_budget_exhausted("github-new-issue", "daily_budget_exceeded");
2092        metrics.record_a2a_hop("agent.example", "succeeded", StdDuration::from_millis(10));
2093        metrics.set_worker_queue_depth("triage", 1);
2094        metrics.record_worker_queue_claim_age("triage", 3.0);
2095        metrics.set_orchestrator_pump_backlog("trigger.inbox.envelopes", 2);
2096        metrics.set_orchestrator_pump_outstanding("trigger.inbox.envelopes", 1);
2097        metrics.record_orchestrator_pump_admission_delay(
2098            "trigger.inbox.envelopes",
2099            StdDuration::from_millis(50),
2100        );
2101        metrics.record_trigger_accepted_to_normalized(
2102            "github-new-issue",
2103            "github-new-issue@v7",
2104            "github",
2105            Some("tenant-a"),
2106            "normalized",
2107            StdDuration::from_millis(25),
2108        );
2109        metrics.record_trigger_accepted_to_queue_append(
2110            "github-new-issue",
2111            "github-new-issue@v7",
2112            "github",
2113            Some("tenant-a"),
2114            "queued",
2115            StdDuration::from_millis(40),
2116        );
2117        metrics.record_trigger_queue_age_at_dispatch_admission(
2118            "github-new-issue",
2119            "github-new-issue@v7",
2120            "github",
2121            Some("tenant-a"),
2122            "admitted",
2123            StdDuration::from_millis(75),
2124        );
2125        metrics.record_trigger_queue_age_at_dispatch_start(
2126            "github-new-issue",
2127            "github-new-issue@v7",
2128            "github",
2129            Some("tenant-a"),
2130            "started",
2131            StdDuration::from_millis(125),
2132        );
2133        metrics.record_trigger_dispatch_runtime(
2134            "github-new-issue",
2135            "github-new-issue@v7",
2136            "github",
2137            Some("tenant-a"),
2138            "succeeded",
2139            StdDuration::from_millis(250),
2140        );
2141        metrics.record_trigger_retry_delay(
2142            "github-new-issue",
2143            "github-new-issue@v7",
2144            "github",
2145            Some("tenant-a"),
2146            "scheduled",
2147            StdDuration::from_secs(2),
2148        );
2149        metrics.record_trigger_accepted_to_dlq(
2150            "github-new-issue",
2151            "github-new-issue@v7",
2152            "github",
2153            Some("tenant-a"),
2154            "retry_exhausted",
2155            StdDuration::from_secs(45),
2156        );
2157        metrics.record_backpressure_event("ingest", "reject");
2158        metrics.note_trigger_pending_event(
2159            "evt-1",
2160            "github-new-issue",
2161            "github-new-issue@v7",
2162            "github",
2163            Some("tenant-a"),
2164            1_000,
2165            4_000,
2166        );
2167        metrics.record_llm_call("mock", "mock", "succeeded", 0.01);
2168        metrics.record_llm_cache_hit("mock");
2169
2170        let rendered = metrics.render_prometheus();
2171        for needle in [
2172            "harn_http_requests_total{endpoint=\"/triggers/github\",method=\"POST\",status=\"200\"} 1",
2173            "harn_http_request_duration_seconds_bucket{endpoint=\"/triggers/github\",le=\"0.05\"} 1",
2174            "harn_http_body_size_bytes_bucket{endpoint=\"/triggers/github\",le=\"512\"} 1",
2175            "harn_trigger_received_total{provider=\"github\",trigger_id=\"github-new-issue\"} 1",
2176            "harn_trigger_deduped_total{reason=\"inbox_duplicate\",trigger_id=\"github-new-issue\"} 1",
2177            "harn_trigger_predicate_evaluations_total{result=\"true\",trigger_id=\"github-new-issue\"} 1",
2178            "harn_trigger_predicate_cost_usd_bucket{le=\"0.01\",trigger_id=\"github-new-issue\"} 1",
2179            "harn_trigger_dispatched_total{handler_kind=\"local\",outcome=\"succeeded\",trigger_id=\"github-new-issue\"} 1",
2180            "harn_trigger_retries_total{attempt=\"2\",trigger_id=\"github-new-issue\"} 1",
2181            "harn_trigger_dlq_total{reason=\"retry_exhausted\",trigger_id=\"github-new-issue\"} 1",
2182            "harn_trigger_inflight{trigger_id=\"github-new-issue\"} 0",
2183            "harn_event_log_append_duration_seconds_bucket{le=\"0.005\",topic=\"orchestrator.triggers.pending\"} 1",
2184            "harn_event_log_topic_size_bytes{topic=\"orchestrator.triggers.pending\"} 2048",
2185            "harn_event_log_consumer_lag{consumer=\"orchestrator-pump\",topic=\"orchestrator.triggers.pending\"} 0",
2186            "harn_trigger_budget_cost_today_usd{trigger_id=\"github-new-issue\"} 0.002",
2187            "harn_trigger_budget_exhausted_total{strategy=\"daily_budget_exceeded\",trigger_id=\"github-new-issue\"} 1",
2188            "harn_backpressure_events_total{action=\"reject\",dimension=\"ingest\"} 1",
2189            "harn_a2a_hops_total{outcome=\"succeeded\",target=\"agent.example\"} 1",
2190            "harn_a2a_hop_duration_seconds_bucket{le=\"0.01\",target=\"agent.example\"} 1",
2191            "harn_worker_queue_depth{queue=\"triage\"} 1",
2192            "harn_worker_queue_claim_age_seconds_bucket{le=\"5\",queue=\"triage\"} 1",
2193            "harn_orchestrator_pump_backlog{topic=\"trigger.inbox.envelopes\"} 2",
2194            "harn_orchestrator_pump_outstanding{topic=\"trigger.inbox.envelopes\"} 1",
2195            "harn_orchestrator_pump_admission_delay_seconds_bucket{le=\"0.05\",topic=\"trigger.inbox.envelopes\"} 1",
2196            "harn_trigger_webhook_accepted_to_normalized_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.025\",provider=\"github\",status=\"normalized\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2197            "harn_trigger_webhook_accepted_to_queue_append_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.05\",provider=\"github\",status=\"queued\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2198            "harn_trigger_queue_age_at_dispatch_admission_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.1\",provider=\"github\",status=\"admitted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2199            "harn_trigger_queue_age_at_dispatch_start_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"started\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2200            "harn_trigger_dispatch_runtime_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"succeeded\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2201            "harn_trigger_retry_delay_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"2.5\",provider=\"github\",status=\"scheduled\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2202            "harn_trigger_accepted_to_dlq_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"60\",provider=\"github\",status=\"retry_exhausted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2203            "harn_trigger_oldest_pending_age_seconds{binding_key=\"github-new-issue@v7\",provider=\"github\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 3",
2204            "harn_llm_calls_total{model=\"mock\",outcome=\"succeeded\",provider=\"mock\"} 1",
2205            "harn_llm_cost_usd_total{model=\"mock\",provider=\"mock\"} 0.01",
2206            "harn_llm_cache_hits_total{provider=\"mock\"} 1",
2207        ] {
2208            assert!(rendered.contains(needle), "missing {needle}\n{rendered}");
2209        }
2210    }
2211}