Skip to main content

harn_vm/connectors/
mod.rs

1//! Connector traits and shared helpers for inbound event-source providers.
2//!
3//! This lands in `harn-vm` for now because the current dependency surface
4//! (`EventLog`, `SecretProvider`, `TriggerEvent`) already lives here. If the
5//! connector ecosystem grows enough to justify extraction, the module can be
6//! split into a dedicated crate later without changing the high-level contract.
7
8use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex, OnceLock};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25    registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26    ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod a2a_push;
30pub mod cron;
31pub mod effect_policy;
32pub mod github;
33pub mod harn_module;
34pub mod hmac;
35pub mod linear;
36pub mod notion;
37pub mod slack;
38pub mod stream;
39#[cfg(test)]
40pub(crate) mod test_util;
41pub mod testkit;
42pub mod webhook;
43
44pub use a2a_push::A2aPushConnector;
45pub use cron::{CatchupMode, CronConnector};
46pub use effect_policy::{
47    connector_export_denied_builtin_reason, connector_export_effect_class,
48    default_connector_export_policy, ConnectorExportEffectClass, HarnConnectorEffectPolicies,
49};
50pub use github::GitHubConnector;
51pub use harn_module::{
52    load_contract as load_harn_connector_contract, HarnConnector, HarnConnectorContract,
53};
54pub use hmac::{
55    verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
56    DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
57    DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
58    DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
59    DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
60    DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
61    SIGNATURE_VERIFY_AUDIT_TOPIC,
62};
63pub use linear::LinearConnector;
64pub use notion::{
65    load_pending_webhook_handshakes, NotionConnector, PersistedNotionWebhookHandshake,
66};
67pub use slack::SlackConnector;
68pub use stream::StreamConnector;
69use webhook::WebhookProviderProfile;
70pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
71
72const OUTBOUND_CONNECTOR_HTTP_TIMEOUT: StdDuration = StdDuration::from_secs(30);
73
74/// Third-party provider ids that still ship Rust-side compatibility
75/// connectors while the pure-Harn connector packages complete their
76/// deprecation soak. New service connectors should be Harn packages that
77/// register through `[[providers]] connector = { harn = "..." }` instead.
78pub const RUST_PROVIDER_CONNECTOR_COMPAT_PROVIDERS: &[&str] =
79    &["github", "linear", "notion", "slack"];
80
81pub fn is_rust_provider_connector_compat_provider(provider: &str) -> bool {
82    RUST_PROVIDER_CONNECTOR_COMPAT_PROVIDERS.contains(&provider)
83}
84
85pub(crate) fn outbound_http_client(user_agent: &'static str) -> reqwest::Client {
86    reqwest::Client::builder()
87        .user_agent(user_agent)
88        .timeout(OUTBOUND_CONNECTOR_HTTP_TIMEOUT)
89        .redirect(reqwest::redirect::Policy::custom(|attempt| {
90            if attempt.previous().len() >= 10 {
91                attempt.error("too many redirects")
92            } else if crate::egress::redirect_url_allowed(
93                "connector_redirect",
94                attempt.url().as_str(),
95            ) {
96                attempt.follow()
97            } else {
98                attempt.error("egress policy blocked redirect target")
99            }
100        }))
101        .build()
102        .expect("connector HTTP client configuration should be valid")
103}
104
105/// Shared owned handle to a connector instance registered with the runtime.
106pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
107
108thread_local! {
109    static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
110        RefCell::new(BTreeMap::new());
111}
112
113/// Provider implementation contract for inbound connectors.
114#[async_trait]
115pub trait Connector: Send + Sync {
116    /// Stable provider id such as `github`, `slack`, or `webhook`.
117    fn provider_id(&self) -> &ProviderId;
118
119    /// Trigger kinds this connector supports (`webhook`, `poll`, `stream`, ...).
120    fn kinds(&self) -> &[TriggerKind];
121
122    /// Called once per connector instance at orchestrator startup.
123    async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
124
125    /// Activate the bindings relevant to this connector instance.
126    async fn activate(
127        &self,
128        bindings: &[TriggerBinding],
129    ) -> Result<ActivationHandle, ConnectorError>;
130
131    /// Stop connector-owned background work and flush any connector-local state.
132    async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
133        Ok(())
134    }
135
136    /// Verify + normalize a provider-native inbound request into `TriggerEvent`.
137    async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
138
139    /// Verify + normalize a provider-native inbound request into the richer
140    /// connector result contract used by ack-first webhook adapters.
141    async fn normalize_inbound_result(
142        &self,
143        raw: RawInbound,
144    ) -> Result<ConnectorNormalizeResult, ConnectorError> {
145        self.normalize_inbound(raw)
146            .await
147            .map(ConnectorNormalizeResult::event)
148    }
149
150    /// Payload schema surfaced to future trigger-type narrowing.
151    fn payload_schema(&self) -> ProviderPayloadSchema;
152
153    /// Outbound API wrapper exposed to handlers.
154    fn client(&self) -> Arc<dyn ConnectorClient>;
155}
156
157/// Provider-supplied HTTP response returned before or instead of trigger dispatch.
158#[derive(Clone, Debug, PartialEq)]
159pub struct ConnectorHttpResponse {
160    pub status: u16,
161    pub headers: BTreeMap<String, String>,
162    pub body: JsonValue,
163}
164
165impl ConnectorHttpResponse {
166    pub fn new(status: u16, headers: BTreeMap<String, String>, body: JsonValue) -> Self {
167        Self {
168            status,
169            headers,
170            body,
171        }
172    }
173}
174
175/// Normalized inbound result accepted by the runtime connector adapter.
176#[derive(Clone, Debug, PartialEq)]
177pub enum ConnectorNormalizeResult {
178    Event(Box<TriggerEvent>),
179    Batch(Vec<TriggerEvent>),
180    ImmediateResponse {
181        response: ConnectorHttpResponse,
182        events: Vec<TriggerEvent>,
183    },
184    Reject(ConnectorHttpResponse),
185}
186
187impl ConnectorNormalizeResult {
188    pub fn event(event: TriggerEvent) -> Self {
189        Self::Event(Box::new(event))
190    }
191
192    pub fn into_events(self) -> Vec<TriggerEvent> {
193        match self {
194            Self::Event(event) => vec![*event],
195            Self::Batch(events) | Self::ImmediateResponse { events, .. } => events,
196            Self::Reject(_) => Vec::new(),
197        }
198    }
199}
200
201#[derive(Clone, Debug, PartialEq)]
202pub enum PostNormalizeOutcome {
203    Ready(Box<TriggerEvent>),
204    DuplicateDropped,
205}
206
207pub async fn postprocess_normalized_event(
208    inbox: &InboxIndex,
209    binding_id: &str,
210    dedupe_enabled: bool,
211    dedupe_ttl: StdDuration,
212    mut event: TriggerEvent,
213) -> Result<PostNormalizeOutcome, ConnectorError> {
214    if dedupe_enabled && !event.dedupe_claimed() {
215        if !inbox
216            .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
217            .await?
218        {
219            return Ok(PostNormalizeOutcome::DuplicateDropped);
220        }
221        event.mark_dedupe_claimed();
222    }
223
224    Ok(PostNormalizeOutcome::Ready(Box::new(event)))
225}
226
227/// Outbound provider client interface used by connector-backed stdlib modules.
228#[async_trait]
229pub trait ConnectorClient: Send + Sync {
230    async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
231}
232
233/// Minimal outbound client errors shared by connector implementations.
234#[derive(Clone, Debug, PartialEq, Eq)]
235pub enum ClientError {
236    MethodNotFound(String),
237    InvalidArgs(String),
238    RateLimited(String),
239    Transport(String),
240    EgressBlocked(crate::egress::EgressBlocked),
241    Other(String),
242}
243
244impl fmt::Display for ClientError {
245    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246        match self {
247            Self::MethodNotFound(message)
248            | Self::InvalidArgs(message)
249            | Self::RateLimited(message)
250            | Self::Transport(message)
251            | Self::Other(message) => message.fmt(f),
252            Self::EgressBlocked(blocked) => blocked.fmt(f),
253        }
254    }
255}
256
257impl std::error::Error for ClientError {}
258
259/// Shared connector-layer errors.
260#[derive(Debug)]
261pub enum ConnectorError {
262    DuplicateProvider(String),
263    DuplicateDelivery(String),
264    UnknownProvider(String),
265    MissingHeader(String),
266    InvalidHeader {
267        name: String,
268        detail: String,
269    },
270    InvalidSignature(String),
271    TimestampOutOfWindow {
272        timestamp: OffsetDateTime,
273        now: OffsetDateTime,
274        window: time::Duration,
275    },
276    Json(String),
277    Secret(String),
278    EventLog(String),
279    HarnRuntime(String),
280    Client(ClientError),
281    Unsupported(String),
282    Activation(String),
283}
284
285impl ConnectorError {
286    pub fn invalid_signature(message: impl Into<String>) -> Self {
287        Self::InvalidSignature(message.into())
288    }
289}
290
291impl fmt::Display for ConnectorError {
292    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
293        match self {
294            Self::DuplicateProvider(provider) => {
295                write!(f, "connector provider `{provider}` is already registered")
296            }
297            Self::DuplicateDelivery(message) => message.fmt(f),
298            Self::UnknownProvider(provider) => {
299                write!(f, "connector provider `{provider}` is not registered")
300            }
301            Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
302            Self::InvalidHeader { name, detail } => {
303                write!(f, "invalid header `{name}`: {detail}")
304            }
305            Self::InvalidSignature(message)
306            | Self::Json(message)
307            | Self::Secret(message)
308            | Self::EventLog(message)
309            | Self::HarnRuntime(message)
310            | Self::Unsupported(message)
311            | Self::Activation(message) => message.fmt(f),
312            Self::TimestampOutOfWindow {
313                timestamp,
314                now,
315                window,
316            } => write!(
317                f,
318                "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
319            ),
320            Self::Client(error) => error.fmt(f),
321        }
322    }
323}
324
325impl std::error::Error for ConnectorError {}
326
327impl From<crate::event_log::LogError> for ConnectorError {
328    fn from(value: crate::event_log::LogError) -> Self {
329        Self::EventLog(value.to_string())
330    }
331}
332
333impl From<crate::secrets::SecretError> for ConnectorError {
334    fn from(value: crate::secrets::SecretError) -> Self {
335        Self::Secret(value.to_string())
336    }
337}
338
339impl From<serde_json::Error> for ConnectorError {
340    fn from(value: serde_json::Error) -> Self {
341        Self::Json(value.to_string())
342    }
343}
344
345impl From<ClientError> for ConnectorError {
346    fn from(value: ClientError) -> Self {
347        Self::Client(value)
348    }
349}
350
351/// Startup context shared with connector instances.
352#[derive(Clone)]
353pub struct ConnectorCtx {
354    pub event_log: Arc<AnyEventLog>,
355    pub secrets: Arc<dyn SecretProvider>,
356    pub inbox: Arc<InboxIndex>,
357    pub metrics: Arc<MetricsRegistry>,
358    pub rate_limiter: Arc<RateLimiterFactory>,
359}
360
361/// Snapshot of connector-local metrics surfaced for tests and diagnostics.
362#[derive(Clone, Debug, Default, PartialEq, Eq)]
363pub struct ConnectorMetricsSnapshot {
364    pub inbox_claims_written: u64,
365    pub inbox_duplicates_rejected: u64,
366    pub inbox_fast_path_hits: u64,
367    pub inbox_durable_hits: u64,
368    pub inbox_expired_entries: u64,
369    pub inbox_active_entries: u64,
370    pub linear_timestamp_rejections_total: u64,
371    pub dispatch_succeeded_total: u64,
372    pub dispatch_failed_total: u64,
373    pub retry_scheduled_total: u64,
374    pub slack_delivery_success_total: u64,
375    pub slack_delivery_failure_total: u64,
376}
377
378type MetricLabels = BTreeMap<String, String>;
379
380#[derive(Clone, Debug, Default, PartialEq)]
381struct HistogramMetric {
382    buckets: BTreeMap<String, u64>,
383    count: u64,
384    sum: f64,
385}
386
387static ACTIVE_METRICS_REGISTRY: OnceLock<Mutex<Option<Arc<MetricsRegistry>>>> = OnceLock::new();
388
389pub fn install_active_metrics_registry(metrics: Arc<MetricsRegistry>) {
390    let slot = ACTIVE_METRICS_REGISTRY.get_or_init(|| Mutex::new(None));
391    *slot.lock().expect("active metrics registry poisoned") = Some(metrics);
392}
393
394pub fn clear_active_metrics_registry() {
395    if let Some(slot) = ACTIVE_METRICS_REGISTRY.get() {
396        *slot.lock().expect("active metrics registry poisoned") = None;
397    }
398}
399
400pub fn active_metrics_registry() -> Option<Arc<MetricsRegistry>> {
401    ACTIVE_METRICS_REGISTRY.get().and_then(|slot| {
402        slot.lock()
403            .expect("active metrics registry poisoned")
404            .clone()
405    })
406}
407
408/// Shared metrics surface for connector-local counters and timings.
409#[derive(Debug, Default)]
410pub struct MetricsRegistry {
411    inbox_claims_written: AtomicU64,
412    inbox_duplicates_rejected: AtomicU64,
413    inbox_fast_path_hits: AtomicU64,
414    inbox_durable_hits: AtomicU64,
415    inbox_expired_entries: AtomicU64,
416    inbox_active_entries: AtomicU64,
417    linear_timestamp_rejections_total: AtomicU64,
418    dispatch_succeeded_total: AtomicU64,
419    dispatch_failed_total: AtomicU64,
420    retry_scheduled_total: AtomicU64,
421    slack_delivery_success_total: AtomicU64,
422    slack_delivery_failure_total: AtomicU64,
423    custom_counters: Mutex<BTreeMap<String, u64>>,
424    counters: Mutex<BTreeMap<(String, MetricLabels), f64>>,
425    gauges: Mutex<BTreeMap<(String, MetricLabels), f64>>,
426    histograms: Mutex<BTreeMap<(String, MetricLabels), HistogramMetric>>,
427    pending_trigger_events: Mutex<BTreeMap<MetricLabels, BTreeMap<String, i64>>>,
428}
429
430impl MetricsRegistry {
431    const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0];
432    const TRIGGER_LATENCY_BUCKETS: [f64; 15] = [
433        0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0,
434    ];
435    const SIZE_BUCKETS: [f64; 9] = [
436        128.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0, 10485760.0,
437    ];
438
439    pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
440        ConnectorMetricsSnapshot {
441            inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
442            inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
443            inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
444            inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
445            inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
446            inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
447            linear_timestamp_rejections_total: self
448                .linear_timestamp_rejections_total
449                .load(Ordering::Relaxed),
450            dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
451            dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
452            retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
453            slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
454            slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
455        }
456    }
457
458    pub(crate) fn record_inbox_claim(&self) {
459        self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
460    }
461
462    pub(crate) fn record_inbox_duplicate_fast_path(&self) {
463        self.inbox_duplicates_rejected
464            .fetch_add(1, Ordering::Relaxed);
465        self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
466    }
467
468    pub(crate) fn record_inbox_duplicate_durable(&self) {
469        self.inbox_duplicates_rejected
470            .fetch_add(1, Ordering::Relaxed);
471        self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
472    }
473
474    pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
475        if count > 0 {
476            self.inbox_expired_entries
477                .fetch_add(count, Ordering::Relaxed);
478        }
479    }
480
481    pub(crate) fn set_inbox_active_entries(&self, count: usize) {
482        self.inbox_active_entries
483            .store(count as u64, Ordering::Relaxed);
484    }
485
486    pub fn record_linear_timestamp_rejection(&self) {
487        self.linear_timestamp_rejections_total
488            .fetch_add(1, Ordering::Relaxed);
489    }
490
491    pub fn record_dispatch_succeeded(&self) {
492        self.dispatch_succeeded_total
493            .fetch_add(1, Ordering::Relaxed);
494    }
495
496    pub fn record_dispatch_failed(&self) {
497        self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
498    }
499
500    pub fn record_retry_scheduled(&self) {
501        self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
502    }
503
504    pub fn record_slack_delivery_success(&self) {
505        self.slack_delivery_success_total
506            .fetch_add(1, Ordering::Relaxed);
507    }
508
509    pub fn record_slack_delivery_failure(&self) {
510        self.slack_delivery_failure_total
511            .fetch_add(1, Ordering::Relaxed);
512    }
513
514    pub fn record_custom_counter(&self, name: &str, amount: u64) {
515        if amount == 0 {
516            return;
517        }
518        let mut counters = self
519            .custom_counters
520            .lock()
521            .expect("custom counters poisoned");
522        *counters.entry(name.to_string()).or_default() += amount;
523    }
524
525    pub fn record_http_request(
526        &self,
527        endpoint: &str,
528        method: &str,
529        status: u16,
530        duration: StdDuration,
531        body_size_bytes: usize,
532    ) {
533        self.increment_counter(
534            "harn_http_requests_total",
535            labels([
536                ("endpoint", endpoint),
537                ("method", method),
538                ("status", &status.to_string()),
539            ]),
540            1,
541        );
542        self.observe_histogram(
543            "harn_http_request_duration_seconds",
544            labels([("endpoint", endpoint)]),
545            duration.as_secs_f64(),
546            &Self::DURATION_BUCKETS,
547        );
548        self.observe_histogram(
549            "harn_http_body_size_bytes",
550            labels([("endpoint", endpoint)]),
551            body_size_bytes as f64,
552            &Self::SIZE_BUCKETS,
553        );
554    }
555
556    pub fn record_trigger_received(&self, trigger_id: &str, provider: &str) {
557        self.increment_counter(
558            "harn_trigger_received_total",
559            labels([("trigger_id", trigger_id), ("provider", provider)]),
560            1,
561        );
562    }
563
564    pub fn record_trigger_deduped(&self, trigger_id: &str, reason: &str) {
565        self.increment_counter(
566            "harn_trigger_deduped_total",
567            labels([("trigger_id", trigger_id), ("reason", reason)]),
568            1,
569        );
570    }
571
572    pub fn record_trigger_predicate_evaluation(
573        &self,
574        trigger_id: &str,
575        result: bool,
576        cost_usd: f64,
577    ) {
578        self.increment_counter(
579            "harn_trigger_predicate_evaluations_total",
580            labels([
581                ("trigger_id", trigger_id),
582                ("result", if result { "true" } else { "false" }),
583            ]),
584            1,
585        );
586        self.observe_histogram(
587            "harn_trigger_predicate_cost_usd",
588            labels([("trigger_id", trigger_id)]),
589            cost_usd.max(0.0),
590            &[0.0, 0.001, 0.01, 0.05, 0.1, 1.0],
591        );
592    }
593
594    pub fn record_trigger_dispatched(&self, trigger_id: &str, handler_kind: &str, outcome: &str) {
595        self.increment_counter(
596            "harn_trigger_dispatched_total",
597            labels([
598                ("trigger_id", trigger_id),
599                ("handler_kind", handler_kind),
600                ("outcome", outcome),
601            ]),
602            1,
603        );
604    }
605
606    pub fn record_trigger_retry(&self, trigger_id: &str, attempt: u32) {
607        self.increment_counter(
608            "harn_trigger_retries_total",
609            labels([
610                ("trigger_id", trigger_id),
611                ("attempt", &attempt.to_string()),
612            ]),
613            1,
614        );
615    }
616
617    pub fn record_trigger_dlq(&self, trigger_id: &str, reason: &str) {
618        self.increment_counter(
619            "harn_trigger_dlq_total",
620            labels([("trigger_id", trigger_id), ("reason", reason)]),
621            1,
622        );
623    }
624
625    pub fn record_trigger_accepted_to_normalized(
626        &self,
627        trigger_id: &str,
628        binding_key: &str,
629        provider: &str,
630        tenant_id: Option<&str>,
631        status: &str,
632        duration: StdDuration,
633    ) {
634        self.observe_histogram(
635            "harn_trigger_webhook_accepted_to_normalized_seconds",
636            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
637            duration.as_secs_f64(),
638            &Self::TRIGGER_LATENCY_BUCKETS,
639        );
640    }
641
642    pub fn record_trigger_accepted_to_queue_append(
643        &self,
644        trigger_id: &str,
645        binding_key: &str,
646        provider: &str,
647        tenant_id: Option<&str>,
648        status: &str,
649        duration: StdDuration,
650    ) {
651        self.observe_histogram(
652            "harn_trigger_webhook_accepted_to_queue_append_seconds",
653            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
654            duration.as_secs_f64(),
655            &Self::TRIGGER_LATENCY_BUCKETS,
656        );
657    }
658
659    pub fn record_trigger_queue_age_at_dispatch_admission(
660        &self,
661        trigger_id: &str,
662        binding_key: &str,
663        provider: &str,
664        tenant_id: Option<&str>,
665        status: &str,
666        age: StdDuration,
667    ) {
668        self.observe_histogram(
669            "harn_trigger_queue_age_at_dispatch_admission_seconds",
670            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
671            age.as_secs_f64(),
672            &Self::TRIGGER_LATENCY_BUCKETS,
673        );
674    }
675
676    pub fn record_trigger_queue_age_at_dispatch_start(
677        &self,
678        trigger_id: &str,
679        binding_key: &str,
680        provider: &str,
681        tenant_id: Option<&str>,
682        status: &str,
683        age: StdDuration,
684    ) {
685        self.observe_histogram(
686            "harn_trigger_queue_age_at_dispatch_start_seconds",
687            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
688            age.as_secs_f64(),
689            &Self::TRIGGER_LATENCY_BUCKETS,
690        );
691    }
692
693    pub fn record_trigger_dispatch_runtime(
694        &self,
695        trigger_id: &str,
696        binding_key: &str,
697        provider: &str,
698        tenant_id: Option<&str>,
699        status: &str,
700        duration: StdDuration,
701    ) {
702        self.observe_histogram(
703            "harn_trigger_dispatch_runtime_seconds",
704            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
705            duration.as_secs_f64(),
706            &Self::TRIGGER_LATENCY_BUCKETS,
707        );
708    }
709
710    pub fn record_trigger_retry_delay(
711        &self,
712        trigger_id: &str,
713        binding_key: &str,
714        provider: &str,
715        tenant_id: Option<&str>,
716        status: &str,
717        duration: StdDuration,
718    ) {
719        self.observe_histogram(
720            "harn_trigger_retry_delay_seconds",
721            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
722            duration.as_secs_f64(),
723            &Self::TRIGGER_LATENCY_BUCKETS,
724        );
725    }
726
727    pub fn record_trigger_accepted_to_dlq(
728        &self,
729        trigger_id: &str,
730        binding_key: &str,
731        provider: &str,
732        tenant_id: Option<&str>,
733        status: &str,
734        duration: StdDuration,
735    ) {
736        self.observe_histogram(
737            "harn_trigger_accepted_to_dlq_seconds",
738            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
739            duration.as_secs_f64(),
740            &Self::TRIGGER_LATENCY_BUCKETS,
741        );
742    }
743
744    pub fn note_trigger_pending_event(
745        &self,
746        event_id: &str,
747        trigger_id: &str,
748        binding_key: &str,
749        provider: &str,
750        tenant_id: Option<&str>,
751        accepted_at_ms: i64,
752        now_ms: i64,
753    ) {
754        let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
755        {
756            let mut pending = self
757                .pending_trigger_events
758                .lock()
759                .expect("pending trigger events poisoned");
760            pending
761                .entry(labels.clone())
762                .or_default()
763                .insert(event_id.to_string(), accepted_at_ms);
764        }
765        self.refresh_oldest_pending_gauge(labels, now_ms);
766    }
767
768    pub fn clear_trigger_pending_event(
769        &self,
770        event_id: &str,
771        trigger_id: &str,
772        binding_key: &str,
773        provider: &str,
774        tenant_id: Option<&str>,
775        now_ms: i64,
776    ) {
777        let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
778        {
779            let mut pending = self
780                .pending_trigger_events
781                .lock()
782                .expect("pending trigger events poisoned");
783            if let Some(events) = pending.get_mut(&labels) {
784                events.remove(event_id);
785                if events.is_empty() {
786                    pending.remove(&labels);
787                }
788            }
789        }
790        self.refresh_oldest_pending_gauge(labels, now_ms);
791    }
792
793    pub fn set_trigger_inflight(&self, trigger_id: &str, count: u64) {
794        self.set_gauge(
795            "harn_trigger_inflight",
796            labels([("trigger_id", trigger_id)]),
797            count as f64,
798        );
799    }
800
801    pub fn set_trigger_budget_cost_today(&self, trigger_id: &str, cost_usd: f64) {
802        self.set_gauge(
803            "harn_trigger_budget_cost_today_usd",
804            labels([("trigger_id", trigger_id)]),
805            cost_usd.max(0.0),
806        );
807    }
808
809    pub fn record_trigger_budget_exhausted(&self, trigger_id: &str, strategy: &str) {
810        self.increment_counter(
811            "harn_trigger_budget_exhausted_total",
812            labels([("trigger_id", trigger_id), ("strategy", strategy)]),
813            1,
814        );
815    }
816
817    pub fn record_backpressure_event(&self, dimension: &str, action: &str) {
818        self.increment_counter(
819            "harn_backpressure_events_total",
820            labels([("dimension", dimension), ("action", action)]),
821            1,
822        );
823    }
824
825    pub fn record_event_log_append(
826        &self,
827        topic: &str,
828        duration: StdDuration,
829        payload_bytes: usize,
830    ) {
831        self.observe_histogram(
832            "harn_event_log_append_duration_seconds",
833            labels([("topic", topic)]),
834            duration.as_secs_f64(),
835            &Self::DURATION_BUCKETS,
836        );
837        self.set_gauge(
838            "harn_event_log_topic_size_bytes",
839            labels([("topic", topic)]),
840            payload_bytes as f64,
841        );
842    }
843
844    pub fn set_event_log_consumer_lag(&self, topic: &str, consumer: &str, lag: u64) {
845        self.set_gauge(
846            "harn_event_log_consumer_lag",
847            labels([("topic", topic), ("consumer", consumer)]),
848            lag as f64,
849        );
850    }
851
852    pub fn record_a2a_hop(&self, target: &str, outcome: &str, duration: StdDuration) {
853        self.increment_counter(
854            "harn_a2a_hops_total",
855            labels([("target", target), ("outcome", outcome)]),
856            1,
857        );
858        self.observe_histogram(
859            "harn_a2a_hop_duration_seconds",
860            labels([("target", target)]),
861            duration.as_secs_f64(),
862            &Self::DURATION_BUCKETS,
863        );
864    }
865
866    pub fn set_worker_queue_depth(&self, queue: &str, depth: u64) {
867        self.set_gauge(
868            "harn_worker_queue_depth",
869            labels([("queue", queue)]),
870            depth as f64,
871        );
872    }
873
874    pub fn record_worker_queue_claim_age(&self, queue: &str, age_seconds: f64) {
875        self.observe_histogram(
876            "harn_worker_queue_claim_age_seconds",
877            labels([("queue", queue)]),
878            age_seconds.max(0.0),
879            &Self::DURATION_BUCKETS,
880        );
881    }
882
883    /// Increment the scheduler-selection counter for a particular fairness key.
884    pub fn record_scheduler_selection(
885        &self,
886        queue: &str,
887        fairness_dimension: &str,
888        fairness_key: &str,
889    ) {
890        self.increment_counter(
891            "harn_scheduler_selections_total",
892            labels([
893                ("queue", queue),
894                ("fairness_dimension", fairness_dimension),
895                ("fairness_key", fairness_key),
896            ]),
897            1,
898        );
899    }
900
901    /// Increment the scheduler-deferred counter (queue had work but couldn't
902    /// be selected because the key was at its concurrency cap).
903    pub fn record_scheduler_deferral(
904        &self,
905        queue: &str,
906        fairness_dimension: &str,
907        fairness_key: &str,
908    ) {
909        self.increment_counter(
910            "harn_scheduler_deferrals_total",
911            labels([
912                ("queue", queue),
913                ("fairness_dimension", fairness_dimension),
914                ("fairness_key", fairness_key),
915            ]),
916            1,
917        );
918    }
919
920    /// Increment the scheduler starvation-promotion counter.
921    pub fn record_scheduler_starvation_promotion(
922        &self,
923        queue: &str,
924        fairness_dimension: &str,
925        fairness_key: &str,
926    ) {
927        self.increment_counter(
928            "harn_scheduler_starvation_promotions_total",
929            labels([
930                ("queue", queue),
931                ("fairness_dimension", fairness_dimension),
932                ("fairness_key", fairness_key),
933            ]),
934            1,
935        );
936    }
937
938    /// Set the current scheduler deficit gauge for a fairness key.
939    pub fn set_scheduler_deficit(
940        &self,
941        queue: &str,
942        fairness_dimension: &str,
943        fairness_key: &str,
944        deficit: i64,
945    ) {
946        self.set_gauge(
947            "harn_scheduler_deficit",
948            labels([
949                ("queue", queue),
950                ("fairness_dimension", fairness_dimension),
951                ("fairness_key", fairness_key),
952            ]),
953            deficit as f64,
954        );
955    }
956
957    /// Set the oldest-eligible-job-age gauge for a fairness key (seconds).
958    pub fn set_scheduler_oldest_eligible_age(
959        &self,
960        queue: &str,
961        fairness_dimension: &str,
962        fairness_key: &str,
963        age_ms: u64,
964    ) {
965        self.set_gauge(
966            "harn_scheduler_oldest_eligible_age_seconds",
967            labels([
968                ("queue", queue),
969                ("fairness_dimension", fairness_dimension),
970                ("fairness_key", fairness_key),
971            ]),
972            age_ms as f64 / 1000.0,
973        );
974    }
975
976    pub fn set_orchestrator_pump_backlog(&self, topic: &str, count: u64) {
977        self.set_gauge(
978            "harn_orchestrator_pump_backlog",
979            labels([("topic", topic)]),
980            count as f64,
981        );
982    }
983
984    pub fn set_orchestrator_pump_outstanding(&self, topic: &str, count: usize) {
985        self.set_gauge(
986            "harn_orchestrator_pump_outstanding",
987            labels([("topic", topic)]),
988            count as f64,
989        );
990    }
991
992    pub fn record_orchestrator_pump_admission_delay(&self, topic: &str, duration: StdDuration) {
993        self.observe_histogram(
994            "harn_orchestrator_pump_admission_delay_seconds",
995            labels([("topic", topic)]),
996            duration.as_secs_f64(),
997            &Self::DURATION_BUCKETS,
998        );
999    }
1000
1001    pub fn record_llm_call(&self, provider: &str, model: &str, outcome: &str, cost_usd: f64) {
1002        self.increment_counter(
1003            "harn_llm_calls_total",
1004            labels([
1005                ("provider", provider),
1006                ("model", model),
1007                ("outcome", outcome),
1008            ]),
1009            1,
1010        );
1011        if cost_usd > 0.0 {
1012            self.increment_counter(
1013                "harn_llm_cost_usd_total",
1014                labels([("provider", provider), ("model", model)]),
1015                cost_usd,
1016            );
1017        } else {
1018            self.ensure_counter(
1019                "harn_llm_cost_usd_total",
1020                labels([("provider", provider), ("model", model)]),
1021            );
1022        }
1023    }
1024
1025    pub fn record_llm_cache_hit(&self, provider: &str) {
1026        self.increment_counter(
1027            "harn_llm_cache_hits_total",
1028            labels([("provider", provider)]),
1029            1,
1030        );
1031    }
1032
1033    pub fn render_prometheus(&self) -> String {
1034        let snapshot = self.snapshot();
1035        let counters = [
1036            (
1037                "connector_linear_timestamp_rejections_total",
1038                snapshot.linear_timestamp_rejections_total,
1039            ),
1040            (
1041                "dispatch_succeeded_total",
1042                snapshot.dispatch_succeeded_total,
1043            ),
1044            ("dispatch_failed_total", snapshot.dispatch_failed_total),
1045            ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
1046            ("retry_scheduled_total", snapshot.retry_scheduled_total),
1047            (
1048                "slack_events_delivery_success_total",
1049                snapshot.slack_delivery_success_total,
1050            ),
1051            (
1052                "slack_events_delivery_failure_total",
1053                snapshot.slack_delivery_failure_total,
1054            ),
1055        ];
1056
1057        let mut rendered = String::new();
1058        for (name, value) in counters {
1059            rendered.push_str("# TYPE ");
1060            rendered.push_str(name);
1061            rendered.push_str(" counter\n");
1062            rendered.push_str(name);
1063            rendered.push(' ');
1064            rendered.push_str(&value.to_string());
1065            rendered.push('\n');
1066        }
1067        let custom_counters = self
1068            .custom_counters
1069            .lock()
1070            .expect("custom counters poisoned");
1071        for (name, value) in custom_counters.iter() {
1072            let metric_name = format!(
1073                "connector_custom_{}_total",
1074                name.chars()
1075                    .map(|ch| if ch.is_ascii_alphanumeric() || ch == '_' {
1076                        ch
1077                    } else {
1078                        '_'
1079                    })
1080                    .collect::<String>()
1081            );
1082            rendered.push_str("# TYPE ");
1083            rendered.push_str(&metric_name);
1084            rendered.push_str(" counter\n");
1085            rendered.push_str(&metric_name);
1086            rendered.push(' ');
1087            rendered.push_str(&value.to_string());
1088            rendered.push('\n');
1089        }
1090        rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
1091        rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
1092        rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
1093        rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
1094        self.render_generic_metrics(&mut rendered);
1095        rendered
1096    }
1097
1098    fn increment_counter(&self, name: &str, labels: MetricLabels, amount: impl Into<f64>) {
1099        let amount = amount.into();
1100        if amount <= 0.0 || !amount.is_finite() {
1101            return;
1102        }
1103        let mut counters = self.counters.lock().expect("metrics counters poisoned");
1104        *counters.entry((name.to_string(), labels)).or_default() += amount;
1105    }
1106
1107    fn ensure_counter(&self, name: &str, labels: MetricLabels) {
1108        let mut counters = self.counters.lock().expect("metrics counters poisoned");
1109        counters.entry((name.to_string(), labels)).or_default();
1110    }
1111
1112    fn set_gauge(&self, name: &str, labels: MetricLabels, value: f64) {
1113        let mut gauges = self.gauges.lock().expect("metrics gauges poisoned");
1114        gauges.insert((name.to_string(), labels), value);
1115    }
1116
1117    fn observe_histogram(
1118        &self,
1119        name: &str,
1120        labels: MetricLabels,
1121        value: f64,
1122        bucket_bounds: &[f64],
1123    ) {
1124        if !value.is_finite() {
1125            return;
1126        }
1127        let mut histograms = self.histograms.lock().expect("metrics histograms poisoned");
1128        let histogram = histograms
1129            .entry((name.to_string(), labels))
1130            .or_insert_with(|| HistogramMetric {
1131                buckets: bucket_bounds
1132                    .iter()
1133                    .map(|bound| (prometheus_float(*bound), 0))
1134                    .chain(std::iter::once(("+Inf".to_string(), 0)))
1135                    .collect(),
1136                count: 0,
1137                sum: 0.0,
1138            });
1139        histogram.count += 1;
1140        histogram.sum += value;
1141        for bound in bucket_bounds {
1142            if value <= *bound {
1143                let key = prometheus_float(*bound);
1144                *histogram.buckets.entry(key).or_default() += 1;
1145            }
1146        }
1147        *histogram.buckets.entry("+Inf".to_string()).or_default() += 1;
1148    }
1149
1150    fn refresh_oldest_pending_gauge(&self, labels: MetricLabels, now_ms: i64) {
1151        let oldest_accepted_at_ms = self
1152            .pending_trigger_events
1153            .lock()
1154            .expect("pending trigger events poisoned")
1155            .get(&labels)
1156            .and_then(|events| events.values().min().copied());
1157        let age_seconds = oldest_accepted_at_ms
1158            .map(|accepted_at_ms| millis_delta(now_ms, accepted_at_ms).as_secs_f64())
1159            .unwrap_or(0.0);
1160        self.set_gauge(
1161            "harn_trigger_oldest_pending_age_seconds",
1162            labels,
1163            age_seconds,
1164        );
1165    }
1166
1167    fn render_generic_metrics(&self, rendered: &mut String) {
1168        let counters = self
1169            .counters
1170            .lock()
1171            .expect("metrics counters poisoned")
1172            .clone();
1173        let gauges = self.gauges.lock().expect("metrics gauges poisoned").clone();
1174        let histograms = self
1175            .histograms
1176            .lock()
1177            .expect("metrics histograms poisoned")
1178            .clone();
1179
1180        for name in metric_family_names(MetricKind::Counter) {
1181            rendered.push_str("# TYPE ");
1182            rendered.push_str(name);
1183            rendered.push_str(" counter\n");
1184            for ((sample_name, labels), value) in counters.iter().filter(|((n, _), _)| n == name) {
1185                render_sample(rendered, sample_name, labels, *value);
1186            }
1187        }
1188        for name in metric_family_names(MetricKind::Gauge) {
1189            rendered.push_str("# TYPE ");
1190            rendered.push_str(name);
1191            rendered.push_str(" gauge\n");
1192            for ((sample_name, labels), value) in gauges.iter().filter(|((n, _), _)| n == name) {
1193                render_sample(rendered, sample_name, labels, *value);
1194            }
1195        }
1196        for name in metric_family_names(MetricKind::Histogram) {
1197            rendered.push_str("# TYPE ");
1198            rendered.push_str(name);
1199            rendered.push_str(" histogram\n");
1200            for ((sample_name, labels), histogram) in
1201                histograms.iter().filter(|((n, _), _)| n == name)
1202            {
1203                for (le, value) in &histogram.buckets {
1204                    let mut bucket_labels = labels.clone();
1205                    bucket_labels.insert("le".to_string(), le.clone());
1206                    render_sample(
1207                        rendered,
1208                        &format!("{sample_name}_bucket"),
1209                        &bucket_labels,
1210                        *value as f64,
1211                    );
1212                }
1213                render_sample(
1214                    rendered,
1215                    &format!("{sample_name}_sum"),
1216                    labels,
1217                    histogram.sum,
1218                );
1219                render_sample(
1220                    rendered,
1221                    &format!("{sample_name}_count"),
1222                    labels,
1223                    histogram.count as f64,
1224                );
1225            }
1226        }
1227    }
1228}
1229
1230#[derive(Clone, Copy)]
1231enum MetricKind {
1232    Counter,
1233    Gauge,
1234    Histogram,
1235}
1236
1237fn metric_family_names(kind: MetricKind) -> &'static [&'static str] {
1238    match kind {
1239        MetricKind::Counter => &[
1240            "harn_http_requests_total",
1241            "harn_trigger_received_total",
1242            "harn_trigger_deduped_total",
1243            "harn_trigger_predicate_evaluations_total",
1244            "harn_trigger_dispatched_total",
1245            "harn_trigger_retries_total",
1246            "harn_trigger_dlq_total",
1247            "harn_trigger_budget_exhausted_total",
1248            "harn_backpressure_events_total",
1249            "harn_a2a_hops_total",
1250            "harn_llm_calls_total",
1251            "harn_llm_cost_usd_total",
1252            "harn_llm_cache_hits_total",
1253            "harn_scheduler_selections_total",
1254            "harn_scheduler_deferrals_total",
1255            "harn_scheduler_starvation_promotions_total",
1256        ],
1257        MetricKind::Gauge => &[
1258            "harn_trigger_inflight",
1259            "harn_event_log_topic_size_bytes",
1260            "harn_event_log_consumer_lag",
1261            "harn_trigger_budget_cost_today_usd",
1262            "harn_worker_queue_depth",
1263            "harn_orchestrator_pump_backlog",
1264            "harn_orchestrator_pump_outstanding",
1265            "harn_trigger_oldest_pending_age_seconds",
1266            "harn_scheduler_deficit",
1267            "harn_scheduler_oldest_eligible_age_seconds",
1268        ],
1269        MetricKind::Histogram => &[
1270            "harn_http_request_duration_seconds",
1271            "harn_http_body_size_bytes",
1272            "harn_trigger_predicate_cost_usd",
1273            "harn_event_log_append_duration_seconds",
1274            "harn_a2a_hop_duration_seconds",
1275            "harn_worker_queue_claim_age_seconds",
1276            "harn_orchestrator_pump_admission_delay_seconds",
1277            "harn_trigger_webhook_accepted_to_normalized_seconds",
1278            "harn_trigger_webhook_accepted_to_queue_append_seconds",
1279            "harn_trigger_queue_age_at_dispatch_admission_seconds",
1280            "harn_trigger_queue_age_at_dispatch_start_seconds",
1281            "harn_trigger_dispatch_runtime_seconds",
1282            "harn_trigger_retry_delay_seconds",
1283            "harn_trigger_accepted_to_dlq_seconds",
1284        ],
1285    }
1286}
1287
1288fn labels<const N: usize>(pairs: [(&str, &str); N]) -> MetricLabels {
1289    pairs
1290        .into_iter()
1291        .map(|(name, value)| (name.to_string(), value.to_string()))
1292        .collect()
1293}
1294
1295fn trigger_lifecycle_labels(
1296    trigger_id: &str,
1297    binding_key: &str,
1298    provider: &str,
1299    tenant_id: Option<&str>,
1300    status: &str,
1301) -> MetricLabels {
1302    labels([
1303        ("binding_key", binding_key),
1304        ("provider", provider),
1305        ("status", status),
1306        ("tenant_id", tenant_label(tenant_id)),
1307        ("trigger_id", trigger_id),
1308    ])
1309}
1310
1311fn trigger_pending_labels(
1312    trigger_id: &str,
1313    binding_key: &str,
1314    provider: &str,
1315    tenant_id: Option<&str>,
1316) -> MetricLabels {
1317    labels([
1318        ("binding_key", binding_key),
1319        ("provider", provider),
1320        ("tenant_id", tenant_label(tenant_id)),
1321        ("trigger_id", trigger_id),
1322    ])
1323}
1324
1325fn tenant_label(tenant_id: Option<&str>) -> &str {
1326    tenant_id
1327        .map(str::trim)
1328        .filter(|value| !value.is_empty())
1329        .unwrap_or("none")
1330}
1331
1332fn millis_delta(later_ms: i64, earlier_ms: i64) -> StdDuration {
1333    StdDuration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
1334}
1335
1336fn render_sample(rendered: &mut String, name: &str, labels: &MetricLabels, value: f64) {
1337    rendered.push_str(name);
1338    if !labels.is_empty() {
1339        rendered.push('{');
1340        for (index, (label, label_value)) in labels.iter().enumerate() {
1341            if index > 0 {
1342                rendered.push(',');
1343            }
1344            rendered.push_str(label);
1345            rendered.push_str("=\"");
1346            rendered.push_str(&escape_label_value(label_value));
1347            rendered.push('"');
1348        }
1349        rendered.push('}');
1350    }
1351    rendered.push(' ');
1352    rendered.push_str(&prometheus_float(value));
1353    rendered.push('\n');
1354}
1355
1356fn escape_label_value(value: &str) -> String {
1357    value
1358        .chars()
1359        .flat_map(|ch| match ch {
1360            '\\' => "\\\\".chars().collect::<Vec<_>>(),
1361            '"' => "\\\"".chars().collect::<Vec<_>>(),
1362            '\n' => "\\n".chars().collect::<Vec<_>>(),
1363            other => vec![other],
1364        })
1365        .collect()
1366}
1367
1368fn prometheus_float(value: f64) -> String {
1369    if value.is_infinite() && value.is_sign_positive() {
1370        return "+Inf".to_string();
1371    }
1372    if value.fract() == 0.0 {
1373        format!("{value:.0}")
1374    } else {
1375        let rendered = format!("{value:.6}");
1376        rendered
1377            .trim_end_matches('0')
1378            .trim_end_matches('.')
1379            .to_string()
1380    }
1381}
1382
1383/// Provider payload schema metadata exposed by a connector.
1384#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1385pub struct ProviderPayloadSchema {
1386    pub harn_schema_name: String,
1387    #[serde(default)]
1388    pub json_schema: JsonValue,
1389}
1390
1391impl ProviderPayloadSchema {
1392    pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
1393        Self {
1394            harn_schema_name: harn_schema_name.into(),
1395            json_schema,
1396        }
1397    }
1398
1399    pub fn named(harn_schema_name: impl Into<String>) -> Self {
1400        Self::new(harn_schema_name, JsonValue::Null)
1401    }
1402}
1403
1404impl Default for ProviderPayloadSchema {
1405    fn default() -> Self {
1406        Self::named("raw")
1407    }
1408}
1409
1410/// High-level transport kind a connector supports.
1411#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1412#[serde(transparent)]
1413pub struct TriggerKind(String);
1414
1415impl TriggerKind {
1416    pub fn new(value: impl Into<String>) -> Self {
1417        Self(value.into())
1418    }
1419
1420    pub fn as_str(&self) -> &str {
1421        self.0.as_str()
1422    }
1423}
1424
1425impl From<&str> for TriggerKind {
1426    fn from(value: &str) -> Self {
1427        Self::new(value)
1428    }
1429}
1430
1431impl From<String> for TriggerKind {
1432    fn from(value: String) -> Self {
1433        Self::new(value)
1434    }
1435}
1436
1437/// Future trigger manifest binding routed to a connector activation.
1438#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1439pub struct TriggerBinding {
1440    pub provider: ProviderId,
1441    pub kind: TriggerKind,
1442    pub binding_id: String,
1443    #[serde(default)]
1444    pub dedupe_key: Option<String>,
1445    #[serde(default = "default_dedupe_retention_days")]
1446    pub dedupe_retention_days: u32,
1447    #[serde(default)]
1448    pub config: JsonValue,
1449}
1450
1451impl TriggerBinding {
1452    pub fn new(
1453        provider: ProviderId,
1454        kind: impl Into<TriggerKind>,
1455        binding_id: impl Into<String>,
1456    ) -> Self {
1457        Self {
1458            provider,
1459            kind: kind.into(),
1460            binding_id: binding_id.into(),
1461            dedupe_key: None,
1462            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1463            config: JsonValue::Null,
1464        }
1465    }
1466}
1467
1468fn default_dedupe_retention_days() -> u32 {
1469    crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
1470}
1471
1472/// Small in-memory trigger-binding registry used to fan bindings into connectors.
1473#[derive(Clone, Debug, Default)]
1474pub struct TriggerRegistry {
1475    bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
1476}
1477
1478impl TriggerRegistry {
1479    pub fn register(&mut self, binding: TriggerBinding) {
1480        self.bindings
1481            .entry(binding.provider.clone())
1482            .or_default()
1483            .push(binding);
1484    }
1485
1486    pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
1487        &self.bindings
1488    }
1489
1490    pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
1491        self.bindings
1492            .get(provider)
1493            .map(Vec::as_slice)
1494            .unwrap_or(&[])
1495    }
1496}
1497
1498/// Metadata returned from connector activation.
1499#[derive(Clone, Debug, PartialEq, Eq)]
1500pub struct ActivationHandle {
1501    pub provider: ProviderId,
1502    pub binding_count: usize,
1503}
1504
1505impl ActivationHandle {
1506    pub fn new(provider: ProviderId, binding_count: usize) -> Self {
1507        Self {
1508            provider,
1509            binding_count,
1510        }
1511    }
1512}
1513
1514/// Provider-native inbound request payload preserved as raw bytes.
1515#[derive(Clone, Debug, PartialEq)]
1516pub struct RawInbound {
1517    pub kind: String,
1518    pub headers: BTreeMap<String, String>,
1519    pub query: BTreeMap<String, String>,
1520    pub body: Vec<u8>,
1521    pub received_at: OffsetDateTime,
1522    pub occurred_at: Option<OffsetDateTime>,
1523    pub tenant_id: Option<TenantId>,
1524    pub metadata: JsonValue,
1525}
1526
1527impl RawInbound {
1528    pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
1529        Self {
1530            kind: kind.into(),
1531            headers,
1532            query: BTreeMap::new(),
1533            body,
1534            received_at: clock::now_utc(),
1535            occurred_at: None,
1536            tenant_id: None,
1537            metadata: JsonValue::Null,
1538        }
1539    }
1540
1541    pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
1542        Ok(serde_json::from_slice(&self.body)?)
1543    }
1544}
1545
1546/// Token-bucket configuration shared across connector clients.
1547#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1548pub struct RateLimitConfig {
1549    pub capacity: u32,
1550    pub refill_tokens: u32,
1551    pub refill_interval: StdDuration,
1552}
1553
1554impl Default for RateLimitConfig {
1555    fn default() -> Self {
1556        Self {
1557            capacity: 60,
1558            refill_tokens: 1,
1559            refill_interval: StdDuration::from_secs(1),
1560        }
1561    }
1562}
1563
1564#[derive(Clone, Debug)]
1565struct TokenBucket {
1566    tokens: f64,
1567    last_refill: ClockInstant,
1568}
1569
1570impl TokenBucket {
1571    fn full(config: RateLimitConfig) -> Self {
1572        Self {
1573            tokens: config.capacity as f64,
1574            last_refill: clock::instant_now(),
1575        }
1576    }
1577
1578    fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
1579        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1580        let rate = config.refill_tokens.max(1) as f64 / interval;
1581        let elapsed = now.duration_since(self.last_refill).as_secs_f64();
1582        self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
1583        self.last_refill = now;
1584    }
1585
1586    fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
1587        self.refill(config, now);
1588        if self.tokens >= 1.0 {
1589            self.tokens -= 1.0;
1590            true
1591        } else {
1592            false
1593        }
1594    }
1595
1596    fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
1597        if self.tokens >= 1.0 {
1598            return StdDuration::ZERO;
1599        }
1600        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1601        let rate = config.refill_tokens.max(1) as f64 / interval;
1602        let missing = (1.0 - self.tokens).max(0.0);
1603        StdDuration::from_secs_f64((missing / rate).max(0.001))
1604    }
1605}
1606
1607/// Shared per-provider, per-key token bucket factory for outbound connector clients.
1608#[derive(Debug)]
1609pub struct RateLimiterFactory {
1610    config: RateLimitConfig,
1611    buckets: Mutex<HashMap<(String, String), TokenBucket>>,
1612}
1613
1614impl RateLimiterFactory {
1615    pub fn new(config: RateLimitConfig) -> Self {
1616        Self {
1617            config,
1618            buckets: Mutex::new(HashMap::new()),
1619        }
1620    }
1621
1622    pub fn config(&self) -> RateLimitConfig {
1623        self.config
1624    }
1625
1626    pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
1627        ScopedRateLimiter {
1628            factory: self,
1629            provider: provider.clone(),
1630            key: key.into(),
1631        }
1632    }
1633
1634    pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
1635        let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1636        let bucket = buckets
1637            .entry((provider.as_str().to_string(), key.to_string()))
1638            .or_insert_with(|| TokenBucket::full(self.config));
1639        bucket.try_acquire(self.config, clock::instant_now())
1640    }
1641
1642    pub async fn acquire(&self, provider: &ProviderId, key: &str) {
1643        loop {
1644            let wait = {
1645                let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1646                let bucket = buckets
1647                    .entry((provider.as_str().to_string(), key.to_string()))
1648                    .or_insert_with(|| TokenBucket::full(self.config));
1649                if bucket.try_acquire(self.config, clock::instant_now()) {
1650                    return;
1651                }
1652                bucket.wait_duration(self.config)
1653            };
1654            tokio::time::sleep(wait).await;
1655        }
1656    }
1657}
1658
1659impl Default for RateLimiterFactory {
1660    fn default() -> Self {
1661        Self::new(RateLimitConfig::default())
1662    }
1663}
1664
1665/// Borrowed view onto a single provider/key rate-limit scope.
1666#[derive(Clone, Debug)]
1667pub struct ScopedRateLimiter<'a> {
1668    factory: &'a RateLimiterFactory,
1669    provider: ProviderId,
1670    key: String,
1671}
1672
1673impl<'a> ScopedRateLimiter<'a> {
1674    pub fn try_acquire(&self) -> bool {
1675        self.factory.try_acquire(&self.provider, &self.key)
1676    }
1677
1678    pub async fn acquire(&self) {
1679        self.factory.acquire(&self.provider, &self.key).await;
1680    }
1681}
1682
1683/// Runtime connector registry keyed by provider id.
1684pub struct ConnectorRegistry {
1685    connectors: BTreeMap<ProviderId, ConnectorHandle>,
1686}
1687
1688impl ConnectorRegistry {
1689    pub fn empty() -> Self {
1690        Self {
1691            connectors: BTreeMap::new(),
1692        }
1693    }
1694
1695    pub fn with_defaults() -> Self {
1696        let mut registry = Self::empty();
1697        for provider in registered_provider_metadata() {
1698            registry
1699                .register(default_connector_for_provider(&provider))
1700                .expect("default connector registration should not fail");
1701        }
1702        registry
1703    }
1704
1705    pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
1706        let provider = connector.provider_id().clone();
1707        if self.connectors.contains_key(&provider) {
1708            return Err(ConnectorError::DuplicateProvider(provider.0));
1709        }
1710        self.connectors
1711            .insert(provider, Arc::new(AsyncMutex::new(connector)));
1712        Ok(())
1713    }
1714
1715    pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
1716        self.connectors.get(id).cloned()
1717    }
1718
1719    pub fn remove(&mut self, id: &ProviderId) -> Option<ConnectorHandle> {
1720        self.connectors.remove(id)
1721    }
1722
1723    pub fn list(&self) -> Vec<ProviderId> {
1724        self.connectors.keys().cloned().collect()
1725    }
1726
1727    pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1728        for connector in self.connectors.values() {
1729            connector.lock().await.init(ctx.clone()).await?;
1730        }
1731        Ok(())
1732    }
1733
1734    pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
1735        let mut clients = BTreeMap::new();
1736        for (provider, connector) in &self.connectors {
1737            let client = connector.lock().await.client();
1738            clients.insert(provider.clone(), client);
1739        }
1740        clients
1741    }
1742
1743    pub async fn activate_all(
1744        &self,
1745        registry: &TriggerRegistry,
1746    ) -> Result<Vec<ActivationHandle>, ConnectorError> {
1747        let mut handles = Vec::new();
1748        for (provider, connector) in &self.connectors {
1749            let bindings = registry.bindings_for(provider);
1750            if bindings.is_empty() {
1751                continue;
1752            }
1753            let connector = connector.lock().await;
1754            handles.push(connector.activate(bindings).await?);
1755        }
1756        Ok(handles)
1757    }
1758}
1759
1760impl Default for ConnectorRegistry {
1761    fn default() -> Self {
1762        Self::with_defaults()
1763    }
1764}
1765
1766fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
1767    // The provider catalog on main registers `github` with
1768    // ProviderRuntimeMetadata::Builtin { connector: "webhook", ... } so that
1769    // before a native connector existed the catalog auto-wired a
1770    // GenericWebhookConnector. Now that #170 lands a first-class
1771    // GitHubConnector (inbound HMAC + GitHub App outbound), we short-circuit
1772    // provider_id "github" here and return the native connector instead of a
1773    // webhook-backed fallback. This keeps manifests that say
1774    // `provider = "github"` pointed at the new connector without requiring
1775    // users to switch to a distinct provider_id.
1776    if provider.provider == "github" {
1777        return Box::new(GitHubConnector::new());
1778    }
1779    if provider.provider == "linear" {
1780        return Box::new(LinearConnector::new());
1781    }
1782    if provider.provider == "slack" {
1783        return Box::new(SlackConnector::new());
1784    }
1785    if provider.provider == "notion" {
1786        return Box::new(NotionConnector::new());
1787    }
1788    if provider.provider == "a2a-push" {
1789        return Box::new(A2aPushConnector::new());
1790    }
1791    match &provider.runtime {
1792        ProviderRuntimeMetadata::Builtin {
1793            connector,
1794            default_signature_variant,
1795        } => match connector.as_str() {
1796            "cron" => Box::new(CronConnector::new()),
1797            "stream" => Box::new(StreamConnector::new(
1798                ProviderId::from(provider.provider.clone()),
1799                provider.schema_name.clone(),
1800            )),
1801            "webhook" => {
1802                let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
1803                    .expect("catalog webhook signature variant must be valid");
1804                Box::new(GenericWebhookConnector::with_profile(
1805                    WebhookProviderProfile::new(
1806                        ProviderId::from(provider.provider.clone()),
1807                        provider.schema_name.clone(),
1808                        variant,
1809                    ),
1810                ))
1811            }
1812            _ => Box::new(PlaceholderConnector::from_metadata(provider)),
1813        },
1814        ProviderRuntimeMetadata::Placeholder => {
1815            Box::new(PlaceholderConnector::from_metadata(provider))
1816        }
1817    }
1818}
1819
1820struct PlaceholderConnector {
1821    provider_id: ProviderId,
1822    kinds: Vec<TriggerKind>,
1823    schema_name: String,
1824}
1825
1826impl PlaceholderConnector {
1827    fn from_metadata(metadata: &ProviderMetadata) -> Self {
1828        Self {
1829            provider_id: ProviderId::from(metadata.provider.clone()),
1830            kinds: metadata
1831                .kinds
1832                .iter()
1833                .cloned()
1834                .map(TriggerKind::from)
1835                .collect(),
1836            schema_name: metadata.schema_name.clone(),
1837        }
1838    }
1839}
1840
1841struct PlaceholderClient;
1842
1843#[async_trait]
1844impl ConnectorClient for PlaceholderClient {
1845    async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1846        Err(ClientError::Other(format!(
1847            "connector client method '{method}' is not implemented for this provider"
1848        )))
1849    }
1850}
1851
1852#[async_trait]
1853impl Connector for PlaceholderConnector {
1854    fn provider_id(&self) -> &ProviderId {
1855        &self.provider_id
1856    }
1857
1858    fn kinds(&self) -> &[TriggerKind] {
1859        &self.kinds
1860    }
1861
1862    async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1863        Ok(())
1864    }
1865
1866    async fn activate(
1867        &self,
1868        bindings: &[TriggerBinding],
1869    ) -> Result<ActivationHandle, ConnectorError> {
1870        Ok(ActivationHandle::new(
1871            self.provider_id.clone(),
1872            bindings.len(),
1873        ))
1874    }
1875
1876    async fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
1877        Err(ConnectorError::Unsupported(format!(
1878            "provider '{}' is cataloged but does not have a concrete inbound connector yet",
1879            self.provider_id.as_str()
1880        )))
1881    }
1882
1883    fn payload_schema(&self) -> ProviderPayloadSchema {
1884        ProviderPayloadSchema::named(self.schema_name.clone())
1885    }
1886
1887    fn client(&self) -> Arc<dyn ConnectorClient> {
1888        Arc::new(PlaceholderClient)
1889    }
1890}
1891
1892pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
1893    ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
1894        *slot.borrow_mut() = clients
1895            .into_iter()
1896            .map(|(provider, client)| (provider.as_str().to_string(), client))
1897            .collect();
1898    });
1899}
1900
1901pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
1902    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
1903}
1904
1905pub fn clear_active_connector_clients() {
1906    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
1907}
1908
1909#[cfg(test)]
1910mod tests {
1911    use super::*;
1912
1913    use std::sync::atomic::{AtomicUsize, Ordering};
1914
1915    use async_trait::async_trait;
1916    use serde_json::json;
1917
1918    struct NoopClient;
1919
1920    #[async_trait]
1921    impl ConnectorClient for NoopClient {
1922        async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1923            Ok(json!({ "method": method }))
1924        }
1925    }
1926
1927    struct FakeConnector {
1928        provider_id: ProviderId,
1929        kinds: Vec<TriggerKind>,
1930        activate_calls: Arc<AtomicUsize>,
1931    }
1932
1933    impl FakeConnector {
1934        fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
1935            Self {
1936                provider_id: ProviderId::from(provider_id),
1937                kinds: vec![TriggerKind::from("webhook")],
1938                activate_calls,
1939            }
1940        }
1941    }
1942
1943    #[async_trait]
1944    impl Connector for FakeConnector {
1945        fn provider_id(&self) -> &ProviderId {
1946            &self.provider_id
1947        }
1948
1949        fn kinds(&self) -> &[TriggerKind] {
1950            &self.kinds
1951        }
1952
1953        async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1954            Ok(())
1955        }
1956
1957        async fn activate(
1958            &self,
1959            bindings: &[TriggerBinding],
1960        ) -> Result<ActivationHandle, ConnectorError> {
1961            self.activate_calls.fetch_add(1, Ordering::SeqCst);
1962            Ok(ActivationHandle::new(
1963                self.provider_id.clone(),
1964                bindings.len(),
1965            ))
1966        }
1967
1968        async fn normalize_inbound(
1969            &self,
1970            _raw: RawInbound,
1971        ) -> Result<TriggerEvent, ConnectorError> {
1972            Err(ConnectorError::Unsupported(
1973                "not needed for registry tests".to_string(),
1974            ))
1975        }
1976
1977        fn payload_schema(&self) -> ProviderPayloadSchema {
1978            ProviderPayloadSchema::named("FakePayload")
1979        }
1980
1981        fn client(&self) -> Arc<dyn ConnectorClient> {
1982            Arc::new(NoopClient)
1983        }
1984    }
1985
1986    #[tokio::test]
1987    async fn connector_registry_rejects_duplicate_providers() {
1988        let activate_calls = Arc::new(AtomicUsize::new(0));
1989        let mut registry = ConnectorRegistry::empty();
1990        registry
1991            .register(Box::new(FakeConnector::new(
1992                "github",
1993                activate_calls.clone(),
1994            )))
1995            .unwrap();
1996
1997        let error = registry
1998            .register(Box::new(FakeConnector::new("github", activate_calls)))
1999            .unwrap_err();
2000        assert!(matches!(
2001            error,
2002            ConnectorError::DuplicateProvider(provider) if provider == "github"
2003        ));
2004    }
2005
2006    #[tokio::test]
2007    async fn connector_registry_activates_only_bound_connectors() {
2008        let github_calls = Arc::new(AtomicUsize::new(0));
2009        let slack_calls = Arc::new(AtomicUsize::new(0));
2010        let mut registry = ConnectorRegistry::empty();
2011        registry
2012            .register(Box::new(FakeConnector::new("github", github_calls.clone())))
2013            .unwrap();
2014        registry
2015            .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
2016            .unwrap();
2017
2018        let mut trigger_registry = TriggerRegistry::default();
2019        trigger_registry.register(TriggerBinding::new(
2020            ProviderId::from("github"),
2021            "webhook",
2022            "github.push",
2023        ));
2024        trigger_registry.register(TriggerBinding::new(
2025            ProviderId::from("github"),
2026            "webhook",
2027            "github.installation",
2028        ));
2029
2030        let handles = registry.activate_all(&trigger_registry).await.unwrap();
2031        assert_eq!(handles.len(), 1);
2032        assert_eq!(handles[0].provider.as_str(), "github");
2033        assert_eq!(handles[0].binding_count, 2);
2034        assert_eq!(github_calls.load(Ordering::SeqCst), 1);
2035        assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
2036    }
2037
2038    #[test]
2039    fn rate_limiter_scopes_tokens_by_provider_and_key() {
2040        let factory = RateLimiterFactory::new(RateLimitConfig {
2041            capacity: 1,
2042            refill_tokens: 1,
2043            refill_interval: StdDuration::from_secs(60),
2044        });
2045
2046        assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
2047        assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
2048        assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
2049        assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
2050    }
2051
2052    #[test]
2053    fn raw_inbound_json_body_preserves_raw_bytes() {
2054        let raw = RawInbound::new(
2055            "push",
2056            BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
2057            br#"{"ok":true}"#.to_vec(),
2058        );
2059
2060        assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
2061    }
2062
2063    #[test]
2064    fn connector_registry_lists_catalog_providers() {
2065        let registry = ConnectorRegistry::default();
2066        let providers = registry.list();
2067        assert!(providers.contains(&ProviderId::from("cron")));
2068        assert!(providers.contains(&ProviderId::from("github")));
2069        assert!(providers.contains(&ProviderId::from("webhook")));
2070    }
2071
2072    #[test]
2073    fn pure_harn_pivot_only_keeps_core_or_compat_builtin_connectors() {
2074        let core_runtime_providers = [
2075            "a2a-push",
2076            "cron",
2077            "email",
2078            "kafka",
2079            "nats",
2080            "postgres-cdc",
2081            "pulsar",
2082            "webhook",
2083            "websocket",
2084        ];
2085
2086        for provider in registered_provider_metadata() {
2087            if !matches!(provider.runtime, ProviderRuntimeMetadata::Builtin { .. }) {
2088                continue;
2089            }
2090
2091            let allowed_core = core_runtime_providers.contains(&provider.provider.as_str());
2092            let allowed_compat = is_rust_provider_connector_compat_provider(&provider.provider);
2093            assert!(
2094                allowed_core || allowed_compat,
2095                "provider '{}' is registered as a Rust builtin connector; new service connectors \
2096                 must ship as pure-Harn packages and register with connector = {{ harn = \"...\" }}",
2097                provider.provider
2098            );
2099        }
2100    }
2101
2102    #[test]
2103    fn metrics_registry_exports_orchestrator_metric_families() {
2104        let metrics = MetricsRegistry::default();
2105        metrics.record_http_request(
2106            "/triggers/github",
2107            "POST",
2108            200,
2109            StdDuration::from_millis(25),
2110            512,
2111        );
2112        metrics.record_trigger_received("github-new-issue", "github");
2113        metrics.record_trigger_deduped("github-new-issue", "inbox_duplicate");
2114        metrics.record_trigger_predicate_evaluation("github-new-issue", true, 0.002);
2115        metrics.record_trigger_dispatched("github-new-issue", "local", "succeeded");
2116        metrics.record_trigger_retry("github-new-issue", 2);
2117        metrics.record_trigger_dlq("github-new-issue", "retry_exhausted");
2118        metrics.set_trigger_inflight("github-new-issue", 0);
2119        metrics.record_event_log_append(
2120            "orchestrator.triggers.pending",
2121            StdDuration::from_millis(1),
2122            2048,
2123        );
2124        metrics.set_event_log_consumer_lag("orchestrator.triggers.pending", "orchestrator-pump", 0);
2125        metrics.set_trigger_budget_cost_today("github-new-issue", 0.002);
2126        metrics.record_trigger_budget_exhausted("github-new-issue", "daily_budget_exceeded");
2127        metrics.record_a2a_hop("agent.example", "succeeded", StdDuration::from_millis(10));
2128        metrics.set_worker_queue_depth("triage", 1);
2129        metrics.record_worker_queue_claim_age("triage", 3.0);
2130        metrics.set_orchestrator_pump_backlog("trigger.inbox.envelopes", 2);
2131        metrics.set_orchestrator_pump_outstanding("trigger.inbox.envelopes", 1);
2132        metrics.record_orchestrator_pump_admission_delay(
2133            "trigger.inbox.envelopes",
2134            StdDuration::from_millis(50),
2135        );
2136        metrics.record_trigger_accepted_to_normalized(
2137            "github-new-issue",
2138            "github-new-issue@v7",
2139            "github",
2140            Some("tenant-a"),
2141            "normalized",
2142            StdDuration::from_millis(25),
2143        );
2144        metrics.record_trigger_accepted_to_queue_append(
2145            "github-new-issue",
2146            "github-new-issue@v7",
2147            "github",
2148            Some("tenant-a"),
2149            "queued",
2150            StdDuration::from_millis(40),
2151        );
2152        metrics.record_trigger_queue_age_at_dispatch_admission(
2153            "github-new-issue",
2154            "github-new-issue@v7",
2155            "github",
2156            Some("tenant-a"),
2157            "admitted",
2158            StdDuration::from_millis(75),
2159        );
2160        metrics.record_trigger_queue_age_at_dispatch_start(
2161            "github-new-issue",
2162            "github-new-issue@v7",
2163            "github",
2164            Some("tenant-a"),
2165            "started",
2166            StdDuration::from_millis(125),
2167        );
2168        metrics.record_trigger_dispatch_runtime(
2169            "github-new-issue",
2170            "github-new-issue@v7",
2171            "github",
2172            Some("tenant-a"),
2173            "succeeded",
2174            StdDuration::from_millis(250),
2175        );
2176        metrics.record_trigger_retry_delay(
2177            "github-new-issue",
2178            "github-new-issue@v7",
2179            "github",
2180            Some("tenant-a"),
2181            "scheduled",
2182            StdDuration::from_secs(2),
2183        );
2184        metrics.record_trigger_accepted_to_dlq(
2185            "github-new-issue",
2186            "github-new-issue@v7",
2187            "github",
2188            Some("tenant-a"),
2189            "retry_exhausted",
2190            StdDuration::from_secs(45),
2191        );
2192        metrics.record_backpressure_event("ingest", "reject");
2193        metrics.note_trigger_pending_event(
2194            "evt-1",
2195            "github-new-issue",
2196            "github-new-issue@v7",
2197            "github",
2198            Some("tenant-a"),
2199            1_000,
2200            4_000,
2201        );
2202        metrics.record_llm_call("mock", "mock", "succeeded", 0.01);
2203        metrics.record_llm_cache_hit("mock");
2204
2205        let rendered = metrics.render_prometheus();
2206        for needle in [
2207            "harn_http_requests_total{endpoint=\"/triggers/github\",method=\"POST\",status=\"200\"} 1",
2208            "harn_http_request_duration_seconds_bucket{endpoint=\"/triggers/github\",le=\"0.05\"} 1",
2209            "harn_http_body_size_bytes_bucket{endpoint=\"/triggers/github\",le=\"512\"} 1",
2210            "harn_trigger_received_total{provider=\"github\",trigger_id=\"github-new-issue\"} 1",
2211            "harn_trigger_deduped_total{reason=\"inbox_duplicate\",trigger_id=\"github-new-issue\"} 1",
2212            "harn_trigger_predicate_evaluations_total{result=\"true\",trigger_id=\"github-new-issue\"} 1",
2213            "harn_trigger_predicate_cost_usd_bucket{le=\"0.01\",trigger_id=\"github-new-issue\"} 1",
2214            "harn_trigger_dispatched_total{handler_kind=\"local\",outcome=\"succeeded\",trigger_id=\"github-new-issue\"} 1",
2215            "harn_trigger_retries_total{attempt=\"2\",trigger_id=\"github-new-issue\"} 1",
2216            "harn_trigger_dlq_total{reason=\"retry_exhausted\",trigger_id=\"github-new-issue\"} 1",
2217            "harn_trigger_inflight{trigger_id=\"github-new-issue\"} 0",
2218            "harn_event_log_append_duration_seconds_bucket{le=\"0.005\",topic=\"orchestrator.triggers.pending\"} 1",
2219            "harn_event_log_topic_size_bytes{topic=\"orchestrator.triggers.pending\"} 2048",
2220            "harn_event_log_consumer_lag{consumer=\"orchestrator-pump\",topic=\"orchestrator.triggers.pending\"} 0",
2221            "harn_trigger_budget_cost_today_usd{trigger_id=\"github-new-issue\"} 0.002",
2222            "harn_trigger_budget_exhausted_total{strategy=\"daily_budget_exceeded\",trigger_id=\"github-new-issue\"} 1",
2223            "harn_backpressure_events_total{action=\"reject\",dimension=\"ingest\"} 1",
2224            "harn_a2a_hops_total{outcome=\"succeeded\",target=\"agent.example\"} 1",
2225            "harn_a2a_hop_duration_seconds_bucket{le=\"0.01\",target=\"agent.example\"} 1",
2226            "harn_worker_queue_depth{queue=\"triage\"} 1",
2227            "harn_worker_queue_claim_age_seconds_bucket{le=\"5\",queue=\"triage\"} 1",
2228            "harn_orchestrator_pump_backlog{topic=\"trigger.inbox.envelopes\"} 2",
2229            "harn_orchestrator_pump_outstanding{topic=\"trigger.inbox.envelopes\"} 1",
2230            "harn_orchestrator_pump_admission_delay_seconds_bucket{le=\"0.05\",topic=\"trigger.inbox.envelopes\"} 1",
2231            "harn_trigger_webhook_accepted_to_normalized_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.025\",provider=\"github\",status=\"normalized\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2232            "harn_trigger_webhook_accepted_to_queue_append_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.05\",provider=\"github\",status=\"queued\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2233            "harn_trigger_queue_age_at_dispatch_admission_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.1\",provider=\"github\",status=\"admitted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2234            "harn_trigger_queue_age_at_dispatch_start_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"started\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2235            "harn_trigger_dispatch_runtime_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"succeeded\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2236            "harn_trigger_retry_delay_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"2.5\",provider=\"github\",status=\"scheduled\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2237            "harn_trigger_accepted_to_dlq_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"60\",provider=\"github\",status=\"retry_exhausted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2238            "harn_trigger_oldest_pending_age_seconds{binding_key=\"github-new-issue@v7\",provider=\"github\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 3",
2239            "harn_llm_calls_total{model=\"mock\",outcome=\"succeeded\",provider=\"mock\"} 1",
2240            "harn_llm_cost_usd_total{model=\"mock\",provider=\"mock\"} 0.01",
2241            "harn_llm_cache_hits_total{provider=\"mock\"} 1",
2242        ] {
2243            assert!(rendered.contains(needle), "missing {needle}\n{rendered}");
2244        }
2245    }
2246}