Skip to main content

harn_vm/connectors/
mod.rs

1//! Connector traits and shared helpers for inbound event-source providers.
2//!
3//! This lands in `harn-vm` for now because the current dependency surface
4//! (`EventLog`, `SecretProvider`, `TriggerEvent`) already lives here. If the
5//! connector ecosystem grows enough to justify extraction, the module can be
6//! split into a dedicated crate later without changing the high-level contract.
7
8use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex, OnceLock};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25    registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26    ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod a2a_push;
30pub mod cron;
31pub mod effect_policy;
32pub mod github;
33pub mod harn_module;
34pub mod hmac;
35pub mod linear;
36pub mod notion;
37pub mod slack;
38pub mod stream;
39#[cfg(test)]
40pub(crate) mod test_util;
41pub mod testkit;
42pub mod webhook;
43
44pub use a2a_push::A2aPushConnector;
45pub use cron::{CatchupMode, CronConnector};
46pub use effect_policy::{
47    connector_export_denied_builtin_reason, connector_export_effect_class,
48    default_connector_export_policy, ConnectorExportEffectClass, HarnConnectorEffectPolicies,
49};
50pub use github::GitHubConnector;
51pub use harn_module::{
52    load_contract as load_harn_connector_contract, HarnConnector, HarnConnectorContract,
53};
54pub use hmac::{
55    verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
56    DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
57    DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
58    DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
59    DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
60    DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
61    SIGNATURE_VERIFY_AUDIT_TOPIC,
62};
63pub use linear::LinearConnector;
64pub use notion::{
65    load_pending_webhook_handshakes, NotionConnector, PersistedNotionWebhookHandshake,
66};
67pub use slack::SlackConnector;
68pub use stream::StreamConnector;
69use webhook::WebhookProviderProfile;
70pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
71
72const OUTBOUND_CONNECTOR_HTTP_TIMEOUT: StdDuration = StdDuration::from_secs(30);
73
74pub(crate) fn outbound_http_client(user_agent: &'static str) -> reqwest::Client {
75    reqwest::Client::builder()
76        .user_agent(user_agent)
77        .timeout(OUTBOUND_CONNECTOR_HTTP_TIMEOUT)
78        .redirect(reqwest::redirect::Policy::custom(|attempt| {
79            if attempt.previous().len() >= 10 {
80                attempt.error("too many redirects")
81            } else if crate::egress::redirect_url_allowed(
82                "connector_redirect",
83                attempt.url().as_str(),
84            ) {
85                attempt.follow()
86            } else {
87                attempt.error("egress policy blocked redirect target")
88            }
89        }))
90        .build()
91        .expect("connector HTTP client configuration should be valid")
92}
93
94/// Shared owned handle to a connector instance registered with the runtime.
95pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
96
97thread_local! {
98    static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
99        RefCell::new(BTreeMap::new());
100}
101
102/// Provider implementation contract for inbound connectors.
103#[async_trait]
104pub trait Connector: Send + Sync {
105    /// Stable provider id such as `github`, `slack`, or `webhook`.
106    fn provider_id(&self) -> &ProviderId;
107
108    /// Trigger kinds this connector supports (`webhook`, `poll`, `stream`, ...).
109    fn kinds(&self) -> &[TriggerKind];
110
111    /// Called once per connector instance at orchestrator startup.
112    async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
113
114    /// Activate the bindings relevant to this connector instance.
115    async fn activate(
116        &self,
117        bindings: &[TriggerBinding],
118    ) -> Result<ActivationHandle, ConnectorError>;
119
120    /// Stop connector-owned background work and flush any connector-local state.
121    async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
122        Ok(())
123    }
124
125    /// Verify + normalize a provider-native inbound request into `TriggerEvent`.
126    async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
127
128    /// Verify + normalize a provider-native inbound request into the richer
129    /// connector result contract used by ack-first webhook adapters.
130    async fn normalize_inbound_result(
131        &self,
132        raw: RawInbound,
133    ) -> Result<ConnectorNormalizeResult, ConnectorError> {
134        self.normalize_inbound(raw)
135            .await
136            .map(ConnectorNormalizeResult::event)
137    }
138
139    /// Payload schema surfaced to future trigger-type narrowing.
140    fn payload_schema(&self) -> ProviderPayloadSchema;
141
142    /// Outbound API wrapper exposed to handlers.
143    fn client(&self) -> Arc<dyn ConnectorClient>;
144}
145
146/// Provider-supplied HTTP response returned before or instead of trigger dispatch.
147#[derive(Clone, Debug, PartialEq)]
148pub struct ConnectorHttpResponse {
149    pub status: u16,
150    pub headers: BTreeMap<String, String>,
151    pub body: JsonValue,
152}
153
154impl ConnectorHttpResponse {
155    pub fn new(status: u16, headers: BTreeMap<String, String>, body: JsonValue) -> Self {
156        Self {
157            status,
158            headers,
159            body,
160        }
161    }
162}
163
164/// Normalized inbound result accepted by the runtime connector adapter.
165#[derive(Clone, Debug, PartialEq)]
166pub enum ConnectorNormalizeResult {
167    Event(Box<TriggerEvent>),
168    Batch(Vec<TriggerEvent>),
169    ImmediateResponse {
170        response: ConnectorHttpResponse,
171        events: Vec<TriggerEvent>,
172    },
173    Reject(ConnectorHttpResponse),
174}
175
176impl ConnectorNormalizeResult {
177    pub fn event(event: TriggerEvent) -> Self {
178        Self::Event(Box::new(event))
179    }
180
181    pub fn into_events(self) -> Vec<TriggerEvent> {
182        match self {
183            Self::Event(event) => vec![*event],
184            Self::Batch(events) | Self::ImmediateResponse { events, .. } => events,
185            Self::Reject(_) => Vec::new(),
186        }
187    }
188}
189
190#[derive(Clone, Debug, PartialEq)]
191pub enum PostNormalizeOutcome {
192    Ready(Box<TriggerEvent>),
193    DuplicateDropped,
194}
195
196pub async fn postprocess_normalized_event(
197    inbox: &InboxIndex,
198    binding_id: &str,
199    dedupe_enabled: bool,
200    dedupe_ttl: StdDuration,
201    mut event: TriggerEvent,
202) -> Result<PostNormalizeOutcome, ConnectorError> {
203    if dedupe_enabled && !event.dedupe_claimed() {
204        if !inbox
205            .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
206            .await?
207        {
208            return Ok(PostNormalizeOutcome::DuplicateDropped);
209        }
210        event.mark_dedupe_claimed();
211    }
212
213    Ok(PostNormalizeOutcome::Ready(Box::new(event)))
214}
215
216/// Outbound provider client interface used by connector-backed stdlib modules.
217#[async_trait]
218pub trait ConnectorClient: Send + Sync {
219    async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
220}
221
222/// Minimal outbound client errors shared by connector implementations.
223#[derive(Clone, Debug, PartialEq, Eq)]
224pub enum ClientError {
225    MethodNotFound(String),
226    InvalidArgs(String),
227    RateLimited(String),
228    Transport(String),
229    EgressBlocked(crate::egress::EgressBlocked),
230    Other(String),
231}
232
233impl fmt::Display for ClientError {
234    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
235        match self {
236            Self::MethodNotFound(message)
237            | Self::InvalidArgs(message)
238            | Self::RateLimited(message)
239            | Self::Transport(message)
240            | Self::Other(message) => message.fmt(f),
241            Self::EgressBlocked(blocked) => blocked.fmt(f),
242        }
243    }
244}
245
246impl std::error::Error for ClientError {}
247
248/// Shared connector-layer errors.
249#[derive(Debug)]
250pub enum ConnectorError {
251    DuplicateProvider(String),
252    DuplicateDelivery(String),
253    UnknownProvider(String),
254    MissingHeader(String),
255    InvalidHeader {
256        name: String,
257        detail: String,
258    },
259    InvalidSignature(String),
260    TimestampOutOfWindow {
261        timestamp: OffsetDateTime,
262        now: OffsetDateTime,
263        window: time::Duration,
264    },
265    Json(String),
266    Secret(String),
267    EventLog(String),
268    HarnRuntime(String),
269    Client(ClientError),
270    Unsupported(String),
271    Activation(String),
272}
273
274impl ConnectorError {
275    pub fn invalid_signature(message: impl Into<String>) -> Self {
276        Self::InvalidSignature(message.into())
277    }
278}
279
280impl fmt::Display for ConnectorError {
281    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
282        match self {
283            Self::DuplicateProvider(provider) => {
284                write!(f, "connector provider `{provider}` is already registered")
285            }
286            Self::DuplicateDelivery(message) => message.fmt(f),
287            Self::UnknownProvider(provider) => {
288                write!(f, "connector provider `{provider}` is not registered")
289            }
290            Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
291            Self::InvalidHeader { name, detail } => {
292                write!(f, "invalid header `{name}`: {detail}")
293            }
294            Self::InvalidSignature(message)
295            | Self::Json(message)
296            | Self::Secret(message)
297            | Self::EventLog(message)
298            | Self::HarnRuntime(message)
299            | Self::Unsupported(message)
300            | Self::Activation(message) => message.fmt(f),
301            Self::TimestampOutOfWindow {
302                timestamp,
303                now,
304                window,
305            } => write!(
306                f,
307                "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
308            ),
309            Self::Client(error) => error.fmt(f),
310        }
311    }
312}
313
314impl std::error::Error for ConnectorError {}
315
316impl From<crate::event_log::LogError> for ConnectorError {
317    fn from(value: crate::event_log::LogError) -> Self {
318        Self::EventLog(value.to_string())
319    }
320}
321
322impl From<crate::secrets::SecretError> for ConnectorError {
323    fn from(value: crate::secrets::SecretError) -> Self {
324        Self::Secret(value.to_string())
325    }
326}
327
328impl From<serde_json::Error> for ConnectorError {
329    fn from(value: serde_json::Error) -> Self {
330        Self::Json(value.to_string())
331    }
332}
333
334impl From<ClientError> for ConnectorError {
335    fn from(value: ClientError) -> Self {
336        Self::Client(value)
337    }
338}
339
340/// Startup context shared with connector instances.
341#[derive(Clone)]
342pub struct ConnectorCtx {
343    pub event_log: Arc<AnyEventLog>,
344    pub secrets: Arc<dyn SecretProvider>,
345    pub inbox: Arc<InboxIndex>,
346    pub metrics: Arc<MetricsRegistry>,
347    pub rate_limiter: Arc<RateLimiterFactory>,
348}
349
350/// Snapshot of connector-local metrics surfaced for tests and diagnostics.
351#[derive(Clone, Debug, Default, PartialEq, Eq)]
352pub struct ConnectorMetricsSnapshot {
353    pub inbox_claims_written: u64,
354    pub inbox_duplicates_rejected: u64,
355    pub inbox_fast_path_hits: u64,
356    pub inbox_durable_hits: u64,
357    pub inbox_expired_entries: u64,
358    pub inbox_active_entries: u64,
359    pub linear_timestamp_rejections_total: u64,
360    pub dispatch_succeeded_total: u64,
361    pub dispatch_failed_total: u64,
362    pub retry_scheduled_total: u64,
363    pub slack_delivery_success_total: u64,
364    pub slack_delivery_failure_total: u64,
365}
366
367type MetricLabels = BTreeMap<String, String>;
368
369#[derive(Clone, Debug, Default, PartialEq)]
370struct HistogramMetric {
371    buckets: BTreeMap<String, u64>,
372    count: u64,
373    sum: f64,
374}
375
376static ACTIVE_METRICS_REGISTRY: OnceLock<Mutex<Option<Arc<MetricsRegistry>>>> = OnceLock::new();
377
378pub fn install_active_metrics_registry(metrics: Arc<MetricsRegistry>) {
379    let slot = ACTIVE_METRICS_REGISTRY.get_or_init(|| Mutex::new(None));
380    *slot.lock().expect("active metrics registry poisoned") = Some(metrics);
381}
382
383pub fn clear_active_metrics_registry() {
384    if let Some(slot) = ACTIVE_METRICS_REGISTRY.get() {
385        *slot.lock().expect("active metrics registry poisoned") = None;
386    }
387}
388
389pub fn active_metrics_registry() -> Option<Arc<MetricsRegistry>> {
390    ACTIVE_METRICS_REGISTRY.get().and_then(|slot| {
391        slot.lock()
392            .expect("active metrics registry poisoned")
393            .clone()
394    })
395}
396
397/// Shared metrics surface for connector-local counters and timings.
398#[derive(Debug, Default)]
399pub struct MetricsRegistry {
400    inbox_claims_written: AtomicU64,
401    inbox_duplicates_rejected: AtomicU64,
402    inbox_fast_path_hits: AtomicU64,
403    inbox_durable_hits: AtomicU64,
404    inbox_expired_entries: AtomicU64,
405    inbox_active_entries: AtomicU64,
406    linear_timestamp_rejections_total: AtomicU64,
407    dispatch_succeeded_total: AtomicU64,
408    dispatch_failed_total: AtomicU64,
409    retry_scheduled_total: AtomicU64,
410    slack_delivery_success_total: AtomicU64,
411    slack_delivery_failure_total: AtomicU64,
412    custom_counters: Mutex<BTreeMap<String, u64>>,
413    counters: Mutex<BTreeMap<(String, MetricLabels), f64>>,
414    gauges: Mutex<BTreeMap<(String, MetricLabels), f64>>,
415    histograms: Mutex<BTreeMap<(String, MetricLabels), HistogramMetric>>,
416    pending_trigger_events: Mutex<BTreeMap<MetricLabels, BTreeMap<String, i64>>>,
417}
418
419impl MetricsRegistry {
420    const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0];
421    const TRIGGER_LATENCY_BUCKETS: [f64; 15] = [
422        0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0,
423    ];
424    const SIZE_BUCKETS: [f64; 9] = [
425        128.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0, 10485760.0,
426    ];
427
428    pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
429        ConnectorMetricsSnapshot {
430            inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
431            inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
432            inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
433            inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
434            inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
435            inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
436            linear_timestamp_rejections_total: self
437                .linear_timestamp_rejections_total
438                .load(Ordering::Relaxed),
439            dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
440            dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
441            retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
442            slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
443            slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
444        }
445    }
446
447    pub(crate) fn record_inbox_claim(&self) {
448        self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
449    }
450
451    pub(crate) fn record_inbox_duplicate_fast_path(&self) {
452        self.inbox_duplicates_rejected
453            .fetch_add(1, Ordering::Relaxed);
454        self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
455    }
456
457    pub(crate) fn record_inbox_duplicate_durable(&self) {
458        self.inbox_duplicates_rejected
459            .fetch_add(1, Ordering::Relaxed);
460        self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
461    }
462
463    pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
464        if count > 0 {
465            self.inbox_expired_entries
466                .fetch_add(count, Ordering::Relaxed);
467        }
468    }
469
470    pub(crate) fn set_inbox_active_entries(&self, count: usize) {
471        self.inbox_active_entries
472            .store(count as u64, Ordering::Relaxed);
473    }
474
475    pub fn record_linear_timestamp_rejection(&self) {
476        self.linear_timestamp_rejections_total
477            .fetch_add(1, Ordering::Relaxed);
478    }
479
480    pub fn record_dispatch_succeeded(&self) {
481        self.dispatch_succeeded_total
482            .fetch_add(1, Ordering::Relaxed);
483    }
484
485    pub fn record_dispatch_failed(&self) {
486        self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
487    }
488
489    pub fn record_retry_scheduled(&self) {
490        self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
491    }
492
493    pub fn record_slack_delivery_success(&self) {
494        self.slack_delivery_success_total
495            .fetch_add(1, Ordering::Relaxed);
496    }
497
498    pub fn record_slack_delivery_failure(&self) {
499        self.slack_delivery_failure_total
500            .fetch_add(1, Ordering::Relaxed);
501    }
502
503    pub fn record_custom_counter(&self, name: &str, amount: u64) {
504        if amount == 0 {
505            return;
506        }
507        let mut counters = self
508            .custom_counters
509            .lock()
510            .expect("custom counters poisoned");
511        *counters.entry(name.to_string()).or_default() += amount;
512    }
513
514    pub fn record_http_request(
515        &self,
516        endpoint: &str,
517        method: &str,
518        status: u16,
519        duration: StdDuration,
520        body_size_bytes: usize,
521    ) {
522        self.increment_counter(
523            "harn_http_requests_total",
524            labels([
525                ("endpoint", endpoint),
526                ("method", method),
527                ("status", &status.to_string()),
528            ]),
529            1,
530        );
531        self.observe_histogram(
532            "harn_http_request_duration_seconds",
533            labels([("endpoint", endpoint)]),
534            duration.as_secs_f64(),
535            &Self::DURATION_BUCKETS,
536        );
537        self.observe_histogram(
538            "harn_http_body_size_bytes",
539            labels([("endpoint", endpoint)]),
540            body_size_bytes as f64,
541            &Self::SIZE_BUCKETS,
542        );
543    }
544
545    pub fn record_trigger_received(&self, trigger_id: &str, provider: &str) {
546        self.increment_counter(
547            "harn_trigger_received_total",
548            labels([("trigger_id", trigger_id), ("provider", provider)]),
549            1,
550        );
551    }
552
553    pub fn record_trigger_deduped(&self, trigger_id: &str, reason: &str) {
554        self.increment_counter(
555            "harn_trigger_deduped_total",
556            labels([("trigger_id", trigger_id), ("reason", reason)]),
557            1,
558        );
559    }
560
561    pub fn record_trigger_predicate_evaluation(
562        &self,
563        trigger_id: &str,
564        result: bool,
565        cost_usd: f64,
566    ) {
567        self.increment_counter(
568            "harn_trigger_predicate_evaluations_total",
569            labels([
570                ("trigger_id", trigger_id),
571                ("result", if result { "true" } else { "false" }),
572            ]),
573            1,
574        );
575        self.observe_histogram(
576            "harn_trigger_predicate_cost_usd",
577            labels([("trigger_id", trigger_id)]),
578            cost_usd.max(0.0),
579            &[0.0, 0.001, 0.01, 0.05, 0.1, 1.0],
580        );
581    }
582
583    pub fn record_trigger_dispatched(&self, trigger_id: &str, handler_kind: &str, outcome: &str) {
584        self.increment_counter(
585            "harn_trigger_dispatched_total",
586            labels([
587                ("trigger_id", trigger_id),
588                ("handler_kind", handler_kind),
589                ("outcome", outcome),
590            ]),
591            1,
592        );
593    }
594
595    pub fn record_trigger_retry(&self, trigger_id: &str, attempt: u32) {
596        self.increment_counter(
597            "harn_trigger_retries_total",
598            labels([
599                ("trigger_id", trigger_id),
600                ("attempt", &attempt.to_string()),
601            ]),
602            1,
603        );
604    }
605
606    pub fn record_trigger_dlq(&self, trigger_id: &str, reason: &str) {
607        self.increment_counter(
608            "harn_trigger_dlq_total",
609            labels([("trigger_id", trigger_id), ("reason", reason)]),
610            1,
611        );
612    }
613
614    pub fn record_trigger_accepted_to_normalized(
615        &self,
616        trigger_id: &str,
617        binding_key: &str,
618        provider: &str,
619        tenant_id: Option<&str>,
620        status: &str,
621        duration: StdDuration,
622    ) {
623        self.observe_histogram(
624            "harn_trigger_webhook_accepted_to_normalized_seconds",
625            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
626            duration.as_secs_f64(),
627            &Self::TRIGGER_LATENCY_BUCKETS,
628        );
629    }
630
631    pub fn record_trigger_accepted_to_queue_append(
632        &self,
633        trigger_id: &str,
634        binding_key: &str,
635        provider: &str,
636        tenant_id: Option<&str>,
637        status: &str,
638        duration: StdDuration,
639    ) {
640        self.observe_histogram(
641            "harn_trigger_webhook_accepted_to_queue_append_seconds",
642            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
643            duration.as_secs_f64(),
644            &Self::TRIGGER_LATENCY_BUCKETS,
645        );
646    }
647
648    pub fn record_trigger_queue_age_at_dispatch_admission(
649        &self,
650        trigger_id: &str,
651        binding_key: &str,
652        provider: &str,
653        tenant_id: Option<&str>,
654        status: &str,
655        age: StdDuration,
656    ) {
657        self.observe_histogram(
658            "harn_trigger_queue_age_at_dispatch_admission_seconds",
659            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
660            age.as_secs_f64(),
661            &Self::TRIGGER_LATENCY_BUCKETS,
662        );
663    }
664
665    pub fn record_trigger_queue_age_at_dispatch_start(
666        &self,
667        trigger_id: &str,
668        binding_key: &str,
669        provider: &str,
670        tenant_id: Option<&str>,
671        status: &str,
672        age: StdDuration,
673    ) {
674        self.observe_histogram(
675            "harn_trigger_queue_age_at_dispatch_start_seconds",
676            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
677            age.as_secs_f64(),
678            &Self::TRIGGER_LATENCY_BUCKETS,
679        );
680    }
681
682    pub fn record_trigger_dispatch_runtime(
683        &self,
684        trigger_id: &str,
685        binding_key: &str,
686        provider: &str,
687        tenant_id: Option<&str>,
688        status: &str,
689        duration: StdDuration,
690    ) {
691        self.observe_histogram(
692            "harn_trigger_dispatch_runtime_seconds",
693            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
694            duration.as_secs_f64(),
695            &Self::TRIGGER_LATENCY_BUCKETS,
696        );
697    }
698
699    pub fn record_trigger_retry_delay(
700        &self,
701        trigger_id: &str,
702        binding_key: &str,
703        provider: &str,
704        tenant_id: Option<&str>,
705        status: &str,
706        duration: StdDuration,
707    ) {
708        self.observe_histogram(
709            "harn_trigger_retry_delay_seconds",
710            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
711            duration.as_secs_f64(),
712            &Self::TRIGGER_LATENCY_BUCKETS,
713        );
714    }
715
716    pub fn record_trigger_accepted_to_dlq(
717        &self,
718        trigger_id: &str,
719        binding_key: &str,
720        provider: &str,
721        tenant_id: Option<&str>,
722        status: &str,
723        duration: StdDuration,
724    ) {
725        self.observe_histogram(
726            "harn_trigger_accepted_to_dlq_seconds",
727            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
728            duration.as_secs_f64(),
729            &Self::TRIGGER_LATENCY_BUCKETS,
730        );
731    }
732
733    pub fn note_trigger_pending_event(
734        &self,
735        event_id: &str,
736        trigger_id: &str,
737        binding_key: &str,
738        provider: &str,
739        tenant_id: Option<&str>,
740        accepted_at_ms: i64,
741        now_ms: i64,
742    ) {
743        let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
744        {
745            let mut pending = self
746                .pending_trigger_events
747                .lock()
748                .expect("pending trigger events poisoned");
749            pending
750                .entry(labels.clone())
751                .or_default()
752                .insert(event_id.to_string(), accepted_at_ms);
753        }
754        self.refresh_oldest_pending_gauge(labels, now_ms);
755    }
756
757    pub fn clear_trigger_pending_event(
758        &self,
759        event_id: &str,
760        trigger_id: &str,
761        binding_key: &str,
762        provider: &str,
763        tenant_id: Option<&str>,
764        now_ms: i64,
765    ) {
766        let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
767        {
768            let mut pending = self
769                .pending_trigger_events
770                .lock()
771                .expect("pending trigger events poisoned");
772            if let Some(events) = pending.get_mut(&labels) {
773                events.remove(event_id);
774                if events.is_empty() {
775                    pending.remove(&labels);
776                }
777            }
778        }
779        self.refresh_oldest_pending_gauge(labels, now_ms);
780    }
781
782    pub fn set_trigger_inflight(&self, trigger_id: &str, count: u64) {
783        self.set_gauge(
784            "harn_trigger_inflight",
785            labels([("trigger_id", trigger_id)]),
786            count as f64,
787        );
788    }
789
790    pub fn set_trigger_budget_cost_today(&self, trigger_id: &str, cost_usd: f64) {
791        self.set_gauge(
792            "harn_trigger_budget_cost_today_usd",
793            labels([("trigger_id", trigger_id)]),
794            cost_usd.max(0.0),
795        );
796    }
797
798    pub fn record_trigger_budget_exhausted(&self, trigger_id: &str, strategy: &str) {
799        self.increment_counter(
800            "harn_trigger_budget_exhausted_total",
801            labels([("trigger_id", trigger_id), ("strategy", strategy)]),
802            1,
803        );
804    }
805
806    pub fn record_backpressure_event(&self, dimension: &str, action: &str) {
807        self.increment_counter(
808            "harn_backpressure_events_total",
809            labels([("dimension", dimension), ("action", action)]),
810            1,
811        );
812    }
813
814    pub fn record_event_log_append(
815        &self,
816        topic: &str,
817        duration: StdDuration,
818        payload_bytes: usize,
819    ) {
820        self.observe_histogram(
821            "harn_event_log_append_duration_seconds",
822            labels([("topic", topic)]),
823            duration.as_secs_f64(),
824            &Self::DURATION_BUCKETS,
825        );
826        self.set_gauge(
827            "harn_event_log_topic_size_bytes",
828            labels([("topic", topic)]),
829            payload_bytes as f64,
830        );
831    }
832
833    pub fn set_event_log_consumer_lag(&self, topic: &str, consumer: &str, lag: u64) {
834        self.set_gauge(
835            "harn_event_log_consumer_lag",
836            labels([("topic", topic), ("consumer", consumer)]),
837            lag as f64,
838        );
839    }
840
841    pub fn record_a2a_hop(&self, target: &str, outcome: &str, duration: StdDuration) {
842        self.increment_counter(
843            "harn_a2a_hops_total",
844            labels([("target", target), ("outcome", outcome)]),
845            1,
846        );
847        self.observe_histogram(
848            "harn_a2a_hop_duration_seconds",
849            labels([("target", target)]),
850            duration.as_secs_f64(),
851            &Self::DURATION_BUCKETS,
852        );
853    }
854
855    pub fn set_worker_queue_depth(&self, queue: &str, depth: u64) {
856        self.set_gauge(
857            "harn_worker_queue_depth",
858            labels([("queue", queue)]),
859            depth as f64,
860        );
861    }
862
863    pub fn record_worker_queue_claim_age(&self, queue: &str, age_seconds: f64) {
864        self.observe_histogram(
865            "harn_worker_queue_claim_age_seconds",
866            labels([("queue", queue)]),
867            age_seconds.max(0.0),
868            &Self::DURATION_BUCKETS,
869        );
870    }
871
872    /// Increment the scheduler-selection counter for a particular fairness key.
873    pub fn record_scheduler_selection(
874        &self,
875        queue: &str,
876        fairness_dimension: &str,
877        fairness_key: &str,
878    ) {
879        self.increment_counter(
880            "harn_scheduler_selections_total",
881            labels([
882                ("queue", queue),
883                ("fairness_dimension", fairness_dimension),
884                ("fairness_key", fairness_key),
885            ]),
886            1,
887        );
888    }
889
890    /// Increment the scheduler-deferred counter (queue had work but couldn't
891    /// be selected because the key was at its concurrency cap).
892    pub fn record_scheduler_deferral(
893        &self,
894        queue: &str,
895        fairness_dimension: &str,
896        fairness_key: &str,
897    ) {
898        self.increment_counter(
899            "harn_scheduler_deferrals_total",
900            labels([
901                ("queue", queue),
902                ("fairness_dimension", fairness_dimension),
903                ("fairness_key", fairness_key),
904            ]),
905            1,
906        );
907    }
908
909    /// Increment the scheduler starvation-promotion counter.
910    pub fn record_scheduler_starvation_promotion(
911        &self,
912        queue: &str,
913        fairness_dimension: &str,
914        fairness_key: &str,
915    ) {
916        self.increment_counter(
917            "harn_scheduler_starvation_promotions_total",
918            labels([
919                ("queue", queue),
920                ("fairness_dimension", fairness_dimension),
921                ("fairness_key", fairness_key),
922            ]),
923            1,
924        );
925    }
926
927    /// Set the current scheduler deficit gauge for a fairness key.
928    pub fn set_scheduler_deficit(
929        &self,
930        queue: &str,
931        fairness_dimension: &str,
932        fairness_key: &str,
933        deficit: i64,
934    ) {
935        self.set_gauge(
936            "harn_scheduler_deficit",
937            labels([
938                ("queue", queue),
939                ("fairness_dimension", fairness_dimension),
940                ("fairness_key", fairness_key),
941            ]),
942            deficit as f64,
943        );
944    }
945
946    /// Set the oldest-eligible-job-age gauge for a fairness key (seconds).
947    pub fn set_scheduler_oldest_eligible_age(
948        &self,
949        queue: &str,
950        fairness_dimension: &str,
951        fairness_key: &str,
952        age_ms: u64,
953    ) {
954        self.set_gauge(
955            "harn_scheduler_oldest_eligible_age_seconds",
956            labels([
957                ("queue", queue),
958                ("fairness_dimension", fairness_dimension),
959                ("fairness_key", fairness_key),
960            ]),
961            age_ms as f64 / 1000.0,
962        );
963    }
964
965    pub fn set_orchestrator_pump_backlog(&self, topic: &str, count: u64) {
966        self.set_gauge(
967            "harn_orchestrator_pump_backlog",
968            labels([("topic", topic)]),
969            count as f64,
970        );
971    }
972
973    pub fn set_orchestrator_pump_outstanding(&self, topic: &str, count: usize) {
974        self.set_gauge(
975            "harn_orchestrator_pump_outstanding",
976            labels([("topic", topic)]),
977            count as f64,
978        );
979    }
980
981    pub fn record_orchestrator_pump_admission_delay(&self, topic: &str, duration: StdDuration) {
982        self.observe_histogram(
983            "harn_orchestrator_pump_admission_delay_seconds",
984            labels([("topic", topic)]),
985            duration.as_secs_f64(),
986            &Self::DURATION_BUCKETS,
987        );
988    }
989
990    pub fn record_llm_call(&self, provider: &str, model: &str, outcome: &str, cost_usd: f64) {
991        self.increment_counter(
992            "harn_llm_calls_total",
993            labels([
994                ("provider", provider),
995                ("model", model),
996                ("outcome", outcome),
997            ]),
998            1,
999        );
1000        if cost_usd > 0.0 {
1001            self.increment_counter(
1002                "harn_llm_cost_usd_total",
1003                labels([("provider", provider), ("model", model)]),
1004                cost_usd,
1005            );
1006        } else {
1007            self.ensure_counter(
1008                "harn_llm_cost_usd_total",
1009                labels([("provider", provider), ("model", model)]),
1010            );
1011        }
1012    }
1013
1014    pub fn record_llm_cache_hit(&self, provider: &str) {
1015        self.increment_counter(
1016            "harn_llm_cache_hits_total",
1017            labels([("provider", provider)]),
1018            1,
1019        );
1020    }
1021
1022    pub fn render_prometheus(&self) -> String {
1023        let snapshot = self.snapshot();
1024        let counters = [
1025            (
1026                "connector_linear_timestamp_rejections_total",
1027                snapshot.linear_timestamp_rejections_total,
1028            ),
1029            (
1030                "dispatch_succeeded_total",
1031                snapshot.dispatch_succeeded_total,
1032            ),
1033            ("dispatch_failed_total", snapshot.dispatch_failed_total),
1034            ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
1035            ("retry_scheduled_total", snapshot.retry_scheduled_total),
1036            (
1037                "slack_events_delivery_success_total",
1038                snapshot.slack_delivery_success_total,
1039            ),
1040            (
1041                "slack_events_delivery_failure_total",
1042                snapshot.slack_delivery_failure_total,
1043            ),
1044        ];
1045
1046        let mut rendered = String::new();
1047        for (name, value) in counters {
1048            rendered.push_str("# TYPE ");
1049            rendered.push_str(name);
1050            rendered.push_str(" counter\n");
1051            rendered.push_str(name);
1052            rendered.push(' ');
1053            rendered.push_str(&value.to_string());
1054            rendered.push('\n');
1055        }
1056        let custom_counters = self
1057            .custom_counters
1058            .lock()
1059            .expect("custom counters poisoned");
1060        for (name, value) in custom_counters.iter() {
1061            let metric_name = format!(
1062                "connector_custom_{}_total",
1063                name.chars()
1064                    .map(|ch| if ch.is_ascii_alphanumeric() || ch == '_' {
1065                        ch
1066                    } else {
1067                        '_'
1068                    })
1069                    .collect::<String>()
1070            );
1071            rendered.push_str("# TYPE ");
1072            rendered.push_str(&metric_name);
1073            rendered.push_str(" counter\n");
1074            rendered.push_str(&metric_name);
1075            rendered.push(' ');
1076            rendered.push_str(&value.to_string());
1077            rendered.push('\n');
1078        }
1079        rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
1080        rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
1081        rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
1082        rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
1083        self.render_generic_metrics(&mut rendered);
1084        rendered
1085    }
1086
1087    fn increment_counter(&self, name: &str, labels: MetricLabels, amount: impl Into<f64>) {
1088        let amount = amount.into();
1089        if amount <= 0.0 || !amount.is_finite() {
1090            return;
1091        }
1092        let mut counters = self.counters.lock().expect("metrics counters poisoned");
1093        *counters.entry((name.to_string(), labels)).or_default() += amount;
1094    }
1095
1096    fn ensure_counter(&self, name: &str, labels: MetricLabels) {
1097        let mut counters = self.counters.lock().expect("metrics counters poisoned");
1098        counters.entry((name.to_string(), labels)).or_default();
1099    }
1100
1101    fn set_gauge(&self, name: &str, labels: MetricLabels, value: f64) {
1102        let mut gauges = self.gauges.lock().expect("metrics gauges poisoned");
1103        gauges.insert((name.to_string(), labels), value);
1104    }
1105
1106    fn observe_histogram(
1107        &self,
1108        name: &str,
1109        labels: MetricLabels,
1110        value: f64,
1111        bucket_bounds: &[f64],
1112    ) {
1113        if !value.is_finite() {
1114            return;
1115        }
1116        let mut histograms = self.histograms.lock().expect("metrics histograms poisoned");
1117        let histogram = histograms
1118            .entry((name.to_string(), labels))
1119            .or_insert_with(|| HistogramMetric {
1120                buckets: bucket_bounds
1121                    .iter()
1122                    .map(|bound| (prometheus_float(*bound), 0))
1123                    .chain(std::iter::once(("+Inf".to_string(), 0)))
1124                    .collect(),
1125                count: 0,
1126                sum: 0.0,
1127            });
1128        histogram.count += 1;
1129        histogram.sum += value;
1130        for bound in bucket_bounds {
1131            if value <= *bound {
1132                let key = prometheus_float(*bound);
1133                *histogram.buckets.entry(key).or_default() += 1;
1134            }
1135        }
1136        *histogram.buckets.entry("+Inf".to_string()).or_default() += 1;
1137    }
1138
1139    fn refresh_oldest_pending_gauge(&self, labels: MetricLabels, now_ms: i64) {
1140        let oldest_accepted_at_ms = self
1141            .pending_trigger_events
1142            .lock()
1143            .expect("pending trigger events poisoned")
1144            .get(&labels)
1145            .and_then(|events| events.values().min().copied());
1146        let age_seconds = oldest_accepted_at_ms
1147            .map(|accepted_at_ms| millis_delta(now_ms, accepted_at_ms).as_secs_f64())
1148            .unwrap_or(0.0);
1149        self.set_gauge(
1150            "harn_trigger_oldest_pending_age_seconds",
1151            labels,
1152            age_seconds,
1153        );
1154    }
1155
1156    fn render_generic_metrics(&self, rendered: &mut String) {
1157        let counters = self
1158            .counters
1159            .lock()
1160            .expect("metrics counters poisoned")
1161            .clone();
1162        let gauges = self.gauges.lock().expect("metrics gauges poisoned").clone();
1163        let histograms = self
1164            .histograms
1165            .lock()
1166            .expect("metrics histograms poisoned")
1167            .clone();
1168
1169        for name in metric_family_names(MetricKind::Counter) {
1170            rendered.push_str("# TYPE ");
1171            rendered.push_str(name);
1172            rendered.push_str(" counter\n");
1173            for ((sample_name, labels), value) in counters.iter().filter(|((n, _), _)| n == name) {
1174                render_sample(rendered, sample_name, labels, *value);
1175            }
1176        }
1177        for name in metric_family_names(MetricKind::Gauge) {
1178            rendered.push_str("# TYPE ");
1179            rendered.push_str(name);
1180            rendered.push_str(" gauge\n");
1181            for ((sample_name, labels), value) in gauges.iter().filter(|((n, _), _)| n == name) {
1182                render_sample(rendered, sample_name, labels, *value);
1183            }
1184        }
1185        for name in metric_family_names(MetricKind::Histogram) {
1186            rendered.push_str("# TYPE ");
1187            rendered.push_str(name);
1188            rendered.push_str(" histogram\n");
1189            for ((sample_name, labels), histogram) in
1190                histograms.iter().filter(|((n, _), _)| n == name)
1191            {
1192                for (le, value) in &histogram.buckets {
1193                    let mut bucket_labels = labels.clone();
1194                    bucket_labels.insert("le".to_string(), le.clone());
1195                    render_sample(
1196                        rendered,
1197                        &format!("{sample_name}_bucket"),
1198                        &bucket_labels,
1199                        *value as f64,
1200                    );
1201                }
1202                render_sample(
1203                    rendered,
1204                    &format!("{sample_name}_sum"),
1205                    labels,
1206                    histogram.sum,
1207                );
1208                render_sample(
1209                    rendered,
1210                    &format!("{sample_name}_count"),
1211                    labels,
1212                    histogram.count as f64,
1213                );
1214            }
1215        }
1216    }
1217}
1218
1219#[derive(Clone, Copy)]
1220enum MetricKind {
1221    Counter,
1222    Gauge,
1223    Histogram,
1224}
1225
1226fn metric_family_names(kind: MetricKind) -> &'static [&'static str] {
1227    match kind {
1228        MetricKind::Counter => &[
1229            "harn_http_requests_total",
1230            "harn_trigger_received_total",
1231            "harn_trigger_deduped_total",
1232            "harn_trigger_predicate_evaluations_total",
1233            "harn_trigger_dispatched_total",
1234            "harn_trigger_retries_total",
1235            "harn_trigger_dlq_total",
1236            "harn_trigger_budget_exhausted_total",
1237            "harn_backpressure_events_total",
1238            "harn_a2a_hops_total",
1239            "harn_llm_calls_total",
1240            "harn_llm_cost_usd_total",
1241            "harn_llm_cache_hits_total",
1242            "harn_scheduler_selections_total",
1243            "harn_scheduler_deferrals_total",
1244            "harn_scheduler_starvation_promotions_total",
1245        ],
1246        MetricKind::Gauge => &[
1247            "harn_trigger_inflight",
1248            "harn_event_log_topic_size_bytes",
1249            "harn_event_log_consumer_lag",
1250            "harn_trigger_budget_cost_today_usd",
1251            "harn_worker_queue_depth",
1252            "harn_orchestrator_pump_backlog",
1253            "harn_orchestrator_pump_outstanding",
1254            "harn_trigger_oldest_pending_age_seconds",
1255            "harn_scheduler_deficit",
1256            "harn_scheduler_oldest_eligible_age_seconds",
1257        ],
1258        MetricKind::Histogram => &[
1259            "harn_http_request_duration_seconds",
1260            "harn_http_body_size_bytes",
1261            "harn_trigger_predicate_cost_usd",
1262            "harn_event_log_append_duration_seconds",
1263            "harn_a2a_hop_duration_seconds",
1264            "harn_worker_queue_claim_age_seconds",
1265            "harn_orchestrator_pump_admission_delay_seconds",
1266            "harn_trigger_webhook_accepted_to_normalized_seconds",
1267            "harn_trigger_webhook_accepted_to_queue_append_seconds",
1268            "harn_trigger_queue_age_at_dispatch_admission_seconds",
1269            "harn_trigger_queue_age_at_dispatch_start_seconds",
1270            "harn_trigger_dispatch_runtime_seconds",
1271            "harn_trigger_retry_delay_seconds",
1272            "harn_trigger_accepted_to_dlq_seconds",
1273        ],
1274    }
1275}
1276
1277fn labels<const N: usize>(pairs: [(&str, &str); N]) -> MetricLabels {
1278    pairs
1279        .into_iter()
1280        .map(|(name, value)| (name.to_string(), value.to_string()))
1281        .collect()
1282}
1283
1284fn trigger_lifecycle_labels(
1285    trigger_id: &str,
1286    binding_key: &str,
1287    provider: &str,
1288    tenant_id: Option<&str>,
1289    status: &str,
1290) -> MetricLabels {
1291    labels([
1292        ("binding_key", binding_key),
1293        ("provider", provider),
1294        ("status", status),
1295        ("tenant_id", tenant_label(tenant_id)),
1296        ("trigger_id", trigger_id),
1297    ])
1298}
1299
1300fn trigger_pending_labels(
1301    trigger_id: &str,
1302    binding_key: &str,
1303    provider: &str,
1304    tenant_id: Option<&str>,
1305) -> MetricLabels {
1306    labels([
1307        ("binding_key", binding_key),
1308        ("provider", provider),
1309        ("tenant_id", tenant_label(tenant_id)),
1310        ("trigger_id", trigger_id),
1311    ])
1312}
1313
1314fn tenant_label(tenant_id: Option<&str>) -> &str {
1315    tenant_id
1316        .map(str::trim)
1317        .filter(|value| !value.is_empty())
1318        .unwrap_or("none")
1319}
1320
1321fn millis_delta(later_ms: i64, earlier_ms: i64) -> StdDuration {
1322    StdDuration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
1323}
1324
1325fn render_sample(rendered: &mut String, name: &str, labels: &MetricLabels, value: f64) {
1326    rendered.push_str(name);
1327    if !labels.is_empty() {
1328        rendered.push('{');
1329        for (index, (label, label_value)) in labels.iter().enumerate() {
1330            if index > 0 {
1331                rendered.push(',');
1332            }
1333            rendered.push_str(label);
1334            rendered.push_str("=\"");
1335            rendered.push_str(&escape_label_value(label_value));
1336            rendered.push('"');
1337        }
1338        rendered.push('}');
1339    }
1340    rendered.push(' ');
1341    rendered.push_str(&prometheus_float(value));
1342    rendered.push('\n');
1343}
1344
1345fn escape_label_value(value: &str) -> String {
1346    value
1347        .chars()
1348        .flat_map(|ch| match ch {
1349            '\\' => "\\\\".chars().collect::<Vec<_>>(),
1350            '"' => "\\\"".chars().collect::<Vec<_>>(),
1351            '\n' => "\\n".chars().collect::<Vec<_>>(),
1352            other => vec![other],
1353        })
1354        .collect()
1355}
1356
1357fn prometheus_float(value: f64) -> String {
1358    if value.is_infinite() && value.is_sign_positive() {
1359        return "+Inf".to_string();
1360    }
1361    if value.fract() == 0.0 {
1362        format!("{value:.0}")
1363    } else {
1364        let rendered = format!("{value:.6}");
1365        rendered
1366            .trim_end_matches('0')
1367            .trim_end_matches('.')
1368            .to_string()
1369    }
1370}
1371
1372/// Provider payload schema metadata exposed by a connector.
1373#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1374pub struct ProviderPayloadSchema {
1375    pub harn_schema_name: String,
1376    #[serde(default)]
1377    pub json_schema: JsonValue,
1378}
1379
1380impl ProviderPayloadSchema {
1381    pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
1382        Self {
1383            harn_schema_name: harn_schema_name.into(),
1384            json_schema,
1385        }
1386    }
1387
1388    pub fn named(harn_schema_name: impl Into<String>) -> Self {
1389        Self::new(harn_schema_name, JsonValue::Null)
1390    }
1391}
1392
1393impl Default for ProviderPayloadSchema {
1394    fn default() -> Self {
1395        Self::named("raw")
1396    }
1397}
1398
1399/// High-level transport kind a connector supports.
1400#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1401#[serde(transparent)]
1402pub struct TriggerKind(String);
1403
1404impl TriggerKind {
1405    pub fn new(value: impl Into<String>) -> Self {
1406        Self(value.into())
1407    }
1408
1409    pub fn as_str(&self) -> &str {
1410        self.0.as_str()
1411    }
1412}
1413
1414impl From<&str> for TriggerKind {
1415    fn from(value: &str) -> Self {
1416        Self::new(value)
1417    }
1418}
1419
1420impl From<String> for TriggerKind {
1421    fn from(value: String) -> Self {
1422        Self::new(value)
1423    }
1424}
1425
1426/// Future trigger manifest binding routed to a connector activation.
1427#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1428pub struct TriggerBinding {
1429    pub provider: ProviderId,
1430    pub kind: TriggerKind,
1431    pub binding_id: String,
1432    #[serde(default)]
1433    pub dedupe_key: Option<String>,
1434    #[serde(default = "default_dedupe_retention_days")]
1435    pub dedupe_retention_days: u32,
1436    #[serde(default)]
1437    pub config: JsonValue,
1438}
1439
1440impl TriggerBinding {
1441    pub fn new(
1442        provider: ProviderId,
1443        kind: impl Into<TriggerKind>,
1444        binding_id: impl Into<String>,
1445    ) -> Self {
1446        Self {
1447            provider,
1448            kind: kind.into(),
1449            binding_id: binding_id.into(),
1450            dedupe_key: None,
1451            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1452            config: JsonValue::Null,
1453        }
1454    }
1455}
1456
1457fn default_dedupe_retention_days() -> u32 {
1458    crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
1459}
1460
1461/// Small in-memory trigger-binding registry used to fan bindings into connectors.
1462#[derive(Clone, Debug, Default)]
1463pub struct TriggerRegistry {
1464    bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
1465}
1466
1467impl TriggerRegistry {
1468    pub fn register(&mut self, binding: TriggerBinding) {
1469        self.bindings
1470            .entry(binding.provider.clone())
1471            .or_default()
1472            .push(binding);
1473    }
1474
1475    pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
1476        &self.bindings
1477    }
1478
1479    pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
1480        self.bindings
1481            .get(provider)
1482            .map(Vec::as_slice)
1483            .unwrap_or(&[])
1484    }
1485}
1486
1487/// Metadata returned from connector activation.
1488#[derive(Clone, Debug, PartialEq, Eq)]
1489pub struct ActivationHandle {
1490    pub provider: ProviderId,
1491    pub binding_count: usize,
1492}
1493
1494impl ActivationHandle {
1495    pub fn new(provider: ProviderId, binding_count: usize) -> Self {
1496        Self {
1497            provider,
1498            binding_count,
1499        }
1500    }
1501}
1502
1503/// Provider-native inbound request payload preserved as raw bytes.
1504#[derive(Clone, Debug, PartialEq)]
1505pub struct RawInbound {
1506    pub kind: String,
1507    pub headers: BTreeMap<String, String>,
1508    pub query: BTreeMap<String, String>,
1509    pub body: Vec<u8>,
1510    pub received_at: OffsetDateTime,
1511    pub occurred_at: Option<OffsetDateTime>,
1512    pub tenant_id: Option<TenantId>,
1513    pub metadata: JsonValue,
1514}
1515
1516impl RawInbound {
1517    pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
1518        Self {
1519            kind: kind.into(),
1520            headers,
1521            query: BTreeMap::new(),
1522            body,
1523            received_at: clock::now_utc(),
1524            occurred_at: None,
1525            tenant_id: None,
1526            metadata: JsonValue::Null,
1527        }
1528    }
1529
1530    pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
1531        Ok(serde_json::from_slice(&self.body)?)
1532    }
1533}
1534
1535/// Token-bucket configuration shared across connector clients.
1536#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1537pub struct RateLimitConfig {
1538    pub capacity: u32,
1539    pub refill_tokens: u32,
1540    pub refill_interval: StdDuration,
1541}
1542
1543impl Default for RateLimitConfig {
1544    fn default() -> Self {
1545        Self {
1546            capacity: 60,
1547            refill_tokens: 1,
1548            refill_interval: StdDuration::from_secs(1),
1549        }
1550    }
1551}
1552
1553#[derive(Clone, Debug)]
1554struct TokenBucket {
1555    tokens: f64,
1556    last_refill: ClockInstant,
1557}
1558
1559impl TokenBucket {
1560    fn full(config: RateLimitConfig) -> Self {
1561        Self {
1562            tokens: config.capacity as f64,
1563            last_refill: clock::instant_now(),
1564        }
1565    }
1566
1567    fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
1568        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1569        let rate = config.refill_tokens.max(1) as f64 / interval;
1570        let elapsed = now.duration_since(self.last_refill).as_secs_f64();
1571        self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
1572        self.last_refill = now;
1573    }
1574
1575    fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
1576        self.refill(config, now);
1577        if self.tokens >= 1.0 {
1578            self.tokens -= 1.0;
1579            true
1580        } else {
1581            false
1582        }
1583    }
1584
1585    fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
1586        if self.tokens >= 1.0 {
1587            return StdDuration::ZERO;
1588        }
1589        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1590        let rate = config.refill_tokens.max(1) as f64 / interval;
1591        let missing = (1.0 - self.tokens).max(0.0);
1592        StdDuration::from_secs_f64((missing / rate).max(0.001))
1593    }
1594}
1595
1596/// Shared per-provider, per-key token bucket factory for outbound connector clients.
1597#[derive(Debug)]
1598pub struct RateLimiterFactory {
1599    config: RateLimitConfig,
1600    buckets: Mutex<HashMap<(String, String), TokenBucket>>,
1601}
1602
1603impl RateLimiterFactory {
1604    pub fn new(config: RateLimitConfig) -> Self {
1605        Self {
1606            config,
1607            buckets: Mutex::new(HashMap::new()),
1608        }
1609    }
1610
1611    pub fn config(&self) -> RateLimitConfig {
1612        self.config
1613    }
1614
1615    pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
1616        ScopedRateLimiter {
1617            factory: self,
1618            provider: provider.clone(),
1619            key: key.into(),
1620        }
1621    }
1622
1623    pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
1624        let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1625        let bucket = buckets
1626            .entry((provider.as_str().to_string(), key.to_string()))
1627            .or_insert_with(|| TokenBucket::full(self.config));
1628        bucket.try_acquire(self.config, clock::instant_now())
1629    }
1630
1631    pub async fn acquire(&self, provider: &ProviderId, key: &str) {
1632        loop {
1633            let wait = {
1634                let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1635                let bucket = buckets
1636                    .entry((provider.as_str().to_string(), key.to_string()))
1637                    .or_insert_with(|| TokenBucket::full(self.config));
1638                if bucket.try_acquire(self.config, clock::instant_now()) {
1639                    return;
1640                }
1641                bucket.wait_duration(self.config)
1642            };
1643            tokio::time::sleep(wait).await;
1644        }
1645    }
1646}
1647
1648impl Default for RateLimiterFactory {
1649    fn default() -> Self {
1650        Self::new(RateLimitConfig::default())
1651    }
1652}
1653
1654/// Borrowed view onto a single provider/key rate-limit scope.
1655#[derive(Clone, Debug)]
1656pub struct ScopedRateLimiter<'a> {
1657    factory: &'a RateLimiterFactory,
1658    provider: ProviderId,
1659    key: String,
1660}
1661
1662impl<'a> ScopedRateLimiter<'a> {
1663    pub fn try_acquire(&self) -> bool {
1664        self.factory.try_acquire(&self.provider, &self.key)
1665    }
1666
1667    pub async fn acquire(&self) {
1668        self.factory.acquire(&self.provider, &self.key).await;
1669    }
1670}
1671
1672/// Runtime connector registry keyed by provider id.
1673pub struct ConnectorRegistry {
1674    connectors: BTreeMap<ProviderId, ConnectorHandle>,
1675}
1676
1677impl ConnectorRegistry {
1678    pub fn empty() -> Self {
1679        Self {
1680            connectors: BTreeMap::new(),
1681        }
1682    }
1683
1684    pub fn with_defaults() -> Self {
1685        let mut registry = Self::empty();
1686        for provider in registered_provider_metadata() {
1687            registry
1688                .register(default_connector_for_provider(&provider))
1689                .expect("default connector registration should not fail");
1690        }
1691        registry
1692    }
1693
1694    pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
1695        let provider = connector.provider_id().clone();
1696        if self.connectors.contains_key(&provider) {
1697            return Err(ConnectorError::DuplicateProvider(provider.0));
1698        }
1699        self.connectors
1700            .insert(provider, Arc::new(AsyncMutex::new(connector)));
1701        Ok(())
1702    }
1703
1704    pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
1705        self.connectors.get(id).cloned()
1706    }
1707
1708    pub fn remove(&mut self, id: &ProviderId) -> Option<ConnectorHandle> {
1709        self.connectors.remove(id)
1710    }
1711
1712    pub fn list(&self) -> Vec<ProviderId> {
1713        self.connectors.keys().cloned().collect()
1714    }
1715
1716    pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1717        for connector in self.connectors.values() {
1718            connector.lock().await.init(ctx.clone()).await?;
1719        }
1720        Ok(())
1721    }
1722
1723    pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
1724        let mut clients = BTreeMap::new();
1725        for (provider, connector) in &self.connectors {
1726            let client = connector.lock().await.client();
1727            clients.insert(provider.clone(), client);
1728        }
1729        clients
1730    }
1731
1732    pub async fn activate_all(
1733        &self,
1734        registry: &TriggerRegistry,
1735    ) -> Result<Vec<ActivationHandle>, ConnectorError> {
1736        let mut handles = Vec::new();
1737        for (provider, connector) in &self.connectors {
1738            let bindings = registry.bindings_for(provider);
1739            if bindings.is_empty() {
1740                continue;
1741            }
1742            let connector = connector.lock().await;
1743            handles.push(connector.activate(bindings).await?);
1744        }
1745        Ok(handles)
1746    }
1747}
1748
1749impl Default for ConnectorRegistry {
1750    fn default() -> Self {
1751        Self::with_defaults()
1752    }
1753}
1754
1755fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
1756    // The provider catalog on main registers `github` with
1757    // ProviderRuntimeMetadata::Builtin { connector: "webhook", ... } so that
1758    // before a native connector existed the catalog auto-wired a
1759    // GenericWebhookConnector. Now that #170 lands a first-class
1760    // GitHubConnector (inbound HMAC + GitHub App outbound), we short-circuit
1761    // provider_id "github" here and return the native connector instead of a
1762    // webhook-backed fallback. This keeps manifests that say
1763    // `provider = "github"` pointed at the new connector without requiring
1764    // users to switch to a distinct provider_id.
1765    if provider.provider == "github" {
1766        return Box::new(GitHubConnector::new());
1767    }
1768    if provider.provider == "linear" {
1769        return Box::new(LinearConnector::new());
1770    }
1771    if provider.provider == "slack" {
1772        return Box::new(SlackConnector::new());
1773    }
1774    if provider.provider == "notion" {
1775        return Box::new(NotionConnector::new());
1776    }
1777    if provider.provider == "a2a-push" {
1778        return Box::new(A2aPushConnector::new());
1779    }
1780    match &provider.runtime {
1781        ProviderRuntimeMetadata::Builtin {
1782            connector,
1783            default_signature_variant,
1784        } => match connector.as_str() {
1785            "cron" => Box::new(CronConnector::new()),
1786            "stream" => Box::new(StreamConnector::new(
1787                ProviderId::from(provider.provider.clone()),
1788                provider.schema_name.clone(),
1789            )),
1790            "webhook" => {
1791                let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
1792                    .expect("catalog webhook signature variant must be valid");
1793                Box::new(GenericWebhookConnector::with_profile(
1794                    WebhookProviderProfile::new(
1795                        ProviderId::from(provider.provider.clone()),
1796                        provider.schema_name.clone(),
1797                        variant,
1798                    ),
1799                ))
1800            }
1801            _ => Box::new(PlaceholderConnector::from_metadata(provider)),
1802        },
1803        ProviderRuntimeMetadata::Placeholder => {
1804            Box::new(PlaceholderConnector::from_metadata(provider))
1805        }
1806    }
1807}
1808
1809struct PlaceholderConnector {
1810    provider_id: ProviderId,
1811    kinds: Vec<TriggerKind>,
1812    schema_name: String,
1813}
1814
1815impl PlaceholderConnector {
1816    fn from_metadata(metadata: &ProviderMetadata) -> Self {
1817        Self {
1818            provider_id: ProviderId::from(metadata.provider.clone()),
1819            kinds: metadata
1820                .kinds
1821                .iter()
1822                .cloned()
1823                .map(TriggerKind::from)
1824                .collect(),
1825            schema_name: metadata.schema_name.clone(),
1826        }
1827    }
1828}
1829
1830struct PlaceholderClient;
1831
1832#[async_trait]
1833impl ConnectorClient for PlaceholderClient {
1834    async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1835        Err(ClientError::Other(format!(
1836            "connector client method '{method}' is not implemented for this provider"
1837        )))
1838    }
1839}
1840
1841#[async_trait]
1842impl Connector for PlaceholderConnector {
1843    fn provider_id(&self) -> &ProviderId {
1844        &self.provider_id
1845    }
1846
1847    fn kinds(&self) -> &[TriggerKind] {
1848        &self.kinds
1849    }
1850
1851    async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1852        Ok(())
1853    }
1854
1855    async fn activate(
1856        &self,
1857        bindings: &[TriggerBinding],
1858    ) -> Result<ActivationHandle, ConnectorError> {
1859        Ok(ActivationHandle::new(
1860            self.provider_id.clone(),
1861            bindings.len(),
1862        ))
1863    }
1864
1865    async fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
1866        Err(ConnectorError::Unsupported(format!(
1867            "provider '{}' is cataloged but does not have a concrete inbound connector yet",
1868            self.provider_id.as_str()
1869        )))
1870    }
1871
1872    fn payload_schema(&self) -> ProviderPayloadSchema {
1873        ProviderPayloadSchema::named(self.schema_name.clone())
1874    }
1875
1876    fn client(&self) -> Arc<dyn ConnectorClient> {
1877        Arc::new(PlaceholderClient)
1878    }
1879}
1880
1881pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
1882    ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
1883        *slot.borrow_mut() = clients
1884            .into_iter()
1885            .map(|(provider, client)| (provider.as_str().to_string(), client))
1886            .collect();
1887    });
1888}
1889
1890pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
1891    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
1892}
1893
1894pub fn clear_active_connector_clients() {
1895    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
1896}
1897
1898#[cfg(test)]
1899mod tests {
1900    use super::*;
1901
1902    use std::sync::atomic::{AtomicUsize, Ordering};
1903
1904    use async_trait::async_trait;
1905    use serde_json::json;
1906
1907    struct NoopClient;
1908
1909    #[async_trait]
1910    impl ConnectorClient for NoopClient {
1911        async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1912            Ok(json!({ "method": method }))
1913        }
1914    }
1915
1916    struct FakeConnector {
1917        provider_id: ProviderId,
1918        kinds: Vec<TriggerKind>,
1919        activate_calls: Arc<AtomicUsize>,
1920    }
1921
1922    impl FakeConnector {
1923        fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
1924            Self {
1925                provider_id: ProviderId::from(provider_id),
1926                kinds: vec![TriggerKind::from("webhook")],
1927                activate_calls,
1928            }
1929        }
1930    }
1931
1932    #[async_trait]
1933    impl Connector for FakeConnector {
1934        fn provider_id(&self) -> &ProviderId {
1935            &self.provider_id
1936        }
1937
1938        fn kinds(&self) -> &[TriggerKind] {
1939            &self.kinds
1940        }
1941
1942        async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1943            Ok(())
1944        }
1945
1946        async fn activate(
1947            &self,
1948            bindings: &[TriggerBinding],
1949        ) -> Result<ActivationHandle, ConnectorError> {
1950            self.activate_calls.fetch_add(1, Ordering::SeqCst);
1951            Ok(ActivationHandle::new(
1952                self.provider_id.clone(),
1953                bindings.len(),
1954            ))
1955        }
1956
1957        async fn normalize_inbound(
1958            &self,
1959            _raw: RawInbound,
1960        ) -> Result<TriggerEvent, ConnectorError> {
1961            Err(ConnectorError::Unsupported(
1962                "not needed for registry tests".to_string(),
1963            ))
1964        }
1965
1966        fn payload_schema(&self) -> ProviderPayloadSchema {
1967            ProviderPayloadSchema::named("FakePayload")
1968        }
1969
1970        fn client(&self) -> Arc<dyn ConnectorClient> {
1971            Arc::new(NoopClient)
1972        }
1973    }
1974
1975    #[tokio::test]
1976    async fn connector_registry_rejects_duplicate_providers() {
1977        let activate_calls = Arc::new(AtomicUsize::new(0));
1978        let mut registry = ConnectorRegistry::empty();
1979        registry
1980            .register(Box::new(FakeConnector::new(
1981                "github",
1982                activate_calls.clone(),
1983            )))
1984            .unwrap();
1985
1986        let error = registry
1987            .register(Box::new(FakeConnector::new("github", activate_calls)))
1988            .unwrap_err();
1989        assert!(matches!(
1990            error,
1991            ConnectorError::DuplicateProvider(provider) if provider == "github"
1992        ));
1993    }
1994
1995    #[tokio::test]
1996    async fn connector_registry_activates_only_bound_connectors() {
1997        let github_calls = Arc::new(AtomicUsize::new(0));
1998        let slack_calls = Arc::new(AtomicUsize::new(0));
1999        let mut registry = ConnectorRegistry::empty();
2000        registry
2001            .register(Box::new(FakeConnector::new("github", github_calls.clone())))
2002            .unwrap();
2003        registry
2004            .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
2005            .unwrap();
2006
2007        let mut trigger_registry = TriggerRegistry::default();
2008        trigger_registry.register(TriggerBinding::new(
2009            ProviderId::from("github"),
2010            "webhook",
2011            "github.push",
2012        ));
2013        trigger_registry.register(TriggerBinding::new(
2014            ProviderId::from("github"),
2015            "webhook",
2016            "github.installation",
2017        ));
2018
2019        let handles = registry.activate_all(&trigger_registry).await.unwrap();
2020        assert_eq!(handles.len(), 1);
2021        assert_eq!(handles[0].provider.as_str(), "github");
2022        assert_eq!(handles[0].binding_count, 2);
2023        assert_eq!(github_calls.load(Ordering::SeqCst), 1);
2024        assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
2025    }
2026
2027    #[test]
2028    fn rate_limiter_scopes_tokens_by_provider_and_key() {
2029        let factory = RateLimiterFactory::new(RateLimitConfig {
2030            capacity: 1,
2031            refill_tokens: 1,
2032            refill_interval: StdDuration::from_secs(60),
2033        });
2034
2035        assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
2036        assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
2037        assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
2038        assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
2039    }
2040
2041    #[test]
2042    fn raw_inbound_json_body_preserves_raw_bytes() {
2043        let raw = RawInbound::new(
2044            "push",
2045            BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
2046            br#"{"ok":true}"#.to_vec(),
2047        );
2048
2049        assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
2050    }
2051
2052    #[test]
2053    fn connector_registry_lists_catalog_providers() {
2054        let registry = ConnectorRegistry::default();
2055        let providers = registry.list();
2056        assert!(providers.contains(&ProviderId::from("cron")));
2057        assert!(providers.contains(&ProviderId::from("github")));
2058        assert!(providers.contains(&ProviderId::from("webhook")));
2059    }
2060
2061    #[test]
2062    fn metrics_registry_exports_orchestrator_metric_families() {
2063        let metrics = MetricsRegistry::default();
2064        metrics.record_http_request(
2065            "/triggers/github",
2066            "POST",
2067            200,
2068            StdDuration::from_millis(25),
2069            512,
2070        );
2071        metrics.record_trigger_received("github-new-issue", "github");
2072        metrics.record_trigger_deduped("github-new-issue", "inbox_duplicate");
2073        metrics.record_trigger_predicate_evaluation("github-new-issue", true, 0.002);
2074        metrics.record_trigger_dispatched("github-new-issue", "local", "succeeded");
2075        metrics.record_trigger_retry("github-new-issue", 2);
2076        metrics.record_trigger_dlq("github-new-issue", "retry_exhausted");
2077        metrics.set_trigger_inflight("github-new-issue", 0);
2078        metrics.record_event_log_append(
2079            "orchestrator.triggers.pending",
2080            StdDuration::from_millis(1),
2081            2048,
2082        );
2083        metrics.set_event_log_consumer_lag("orchestrator.triggers.pending", "orchestrator-pump", 0);
2084        metrics.set_trigger_budget_cost_today("github-new-issue", 0.002);
2085        metrics.record_trigger_budget_exhausted("github-new-issue", "daily_budget_exceeded");
2086        metrics.record_a2a_hop("agent.example", "succeeded", StdDuration::from_millis(10));
2087        metrics.set_worker_queue_depth("triage", 1);
2088        metrics.record_worker_queue_claim_age("triage", 3.0);
2089        metrics.set_orchestrator_pump_backlog("trigger.inbox.envelopes", 2);
2090        metrics.set_orchestrator_pump_outstanding("trigger.inbox.envelopes", 1);
2091        metrics.record_orchestrator_pump_admission_delay(
2092            "trigger.inbox.envelopes",
2093            StdDuration::from_millis(50),
2094        );
2095        metrics.record_trigger_accepted_to_normalized(
2096            "github-new-issue",
2097            "github-new-issue@v7",
2098            "github",
2099            Some("tenant-a"),
2100            "normalized",
2101            StdDuration::from_millis(25),
2102        );
2103        metrics.record_trigger_accepted_to_queue_append(
2104            "github-new-issue",
2105            "github-new-issue@v7",
2106            "github",
2107            Some("tenant-a"),
2108            "queued",
2109            StdDuration::from_millis(40),
2110        );
2111        metrics.record_trigger_queue_age_at_dispatch_admission(
2112            "github-new-issue",
2113            "github-new-issue@v7",
2114            "github",
2115            Some("tenant-a"),
2116            "admitted",
2117            StdDuration::from_millis(75),
2118        );
2119        metrics.record_trigger_queue_age_at_dispatch_start(
2120            "github-new-issue",
2121            "github-new-issue@v7",
2122            "github",
2123            Some("tenant-a"),
2124            "started",
2125            StdDuration::from_millis(125),
2126        );
2127        metrics.record_trigger_dispatch_runtime(
2128            "github-new-issue",
2129            "github-new-issue@v7",
2130            "github",
2131            Some("tenant-a"),
2132            "succeeded",
2133            StdDuration::from_millis(250),
2134        );
2135        metrics.record_trigger_retry_delay(
2136            "github-new-issue",
2137            "github-new-issue@v7",
2138            "github",
2139            Some("tenant-a"),
2140            "scheduled",
2141            StdDuration::from_secs(2),
2142        );
2143        metrics.record_trigger_accepted_to_dlq(
2144            "github-new-issue",
2145            "github-new-issue@v7",
2146            "github",
2147            Some("tenant-a"),
2148            "retry_exhausted",
2149            StdDuration::from_secs(45),
2150        );
2151        metrics.record_backpressure_event("ingest", "reject");
2152        metrics.note_trigger_pending_event(
2153            "evt-1",
2154            "github-new-issue",
2155            "github-new-issue@v7",
2156            "github",
2157            Some("tenant-a"),
2158            1_000,
2159            4_000,
2160        );
2161        metrics.record_llm_call("mock", "mock", "succeeded", 0.01);
2162        metrics.record_llm_cache_hit("mock");
2163
2164        let rendered = metrics.render_prometheus();
2165        for needle in [
2166            "harn_http_requests_total{endpoint=\"/triggers/github\",method=\"POST\",status=\"200\"} 1",
2167            "harn_http_request_duration_seconds_bucket{endpoint=\"/triggers/github\",le=\"0.05\"} 1",
2168            "harn_http_body_size_bytes_bucket{endpoint=\"/triggers/github\",le=\"512\"} 1",
2169            "harn_trigger_received_total{provider=\"github\",trigger_id=\"github-new-issue\"} 1",
2170            "harn_trigger_deduped_total{reason=\"inbox_duplicate\",trigger_id=\"github-new-issue\"} 1",
2171            "harn_trigger_predicate_evaluations_total{result=\"true\",trigger_id=\"github-new-issue\"} 1",
2172            "harn_trigger_predicate_cost_usd_bucket{le=\"0.01\",trigger_id=\"github-new-issue\"} 1",
2173            "harn_trigger_dispatched_total{handler_kind=\"local\",outcome=\"succeeded\",trigger_id=\"github-new-issue\"} 1",
2174            "harn_trigger_retries_total{attempt=\"2\",trigger_id=\"github-new-issue\"} 1",
2175            "harn_trigger_dlq_total{reason=\"retry_exhausted\",trigger_id=\"github-new-issue\"} 1",
2176            "harn_trigger_inflight{trigger_id=\"github-new-issue\"} 0",
2177            "harn_event_log_append_duration_seconds_bucket{le=\"0.005\",topic=\"orchestrator.triggers.pending\"} 1",
2178            "harn_event_log_topic_size_bytes{topic=\"orchestrator.triggers.pending\"} 2048",
2179            "harn_event_log_consumer_lag{consumer=\"orchestrator-pump\",topic=\"orchestrator.triggers.pending\"} 0",
2180            "harn_trigger_budget_cost_today_usd{trigger_id=\"github-new-issue\"} 0.002",
2181            "harn_trigger_budget_exhausted_total{strategy=\"daily_budget_exceeded\",trigger_id=\"github-new-issue\"} 1",
2182            "harn_backpressure_events_total{action=\"reject\",dimension=\"ingest\"} 1",
2183            "harn_a2a_hops_total{outcome=\"succeeded\",target=\"agent.example\"} 1",
2184            "harn_a2a_hop_duration_seconds_bucket{le=\"0.01\",target=\"agent.example\"} 1",
2185            "harn_worker_queue_depth{queue=\"triage\"} 1",
2186            "harn_worker_queue_claim_age_seconds_bucket{le=\"5\",queue=\"triage\"} 1",
2187            "harn_orchestrator_pump_backlog{topic=\"trigger.inbox.envelopes\"} 2",
2188            "harn_orchestrator_pump_outstanding{topic=\"trigger.inbox.envelopes\"} 1",
2189            "harn_orchestrator_pump_admission_delay_seconds_bucket{le=\"0.05\",topic=\"trigger.inbox.envelopes\"} 1",
2190            "harn_trigger_webhook_accepted_to_normalized_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.025\",provider=\"github\",status=\"normalized\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2191            "harn_trigger_webhook_accepted_to_queue_append_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.05\",provider=\"github\",status=\"queued\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2192            "harn_trigger_queue_age_at_dispatch_admission_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.1\",provider=\"github\",status=\"admitted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2193            "harn_trigger_queue_age_at_dispatch_start_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"started\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2194            "harn_trigger_dispatch_runtime_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"succeeded\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2195            "harn_trigger_retry_delay_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"2.5\",provider=\"github\",status=\"scheduled\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2196            "harn_trigger_accepted_to_dlq_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"60\",provider=\"github\",status=\"retry_exhausted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2197            "harn_trigger_oldest_pending_age_seconds{binding_key=\"github-new-issue@v7\",provider=\"github\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 3",
2198            "harn_llm_calls_total{model=\"mock\",outcome=\"succeeded\",provider=\"mock\"} 1",
2199            "harn_llm_cost_usd_total{model=\"mock\",provider=\"mock\"} 0.01",
2200            "harn_llm_cache_hits_total{provider=\"mock\"} 1",
2201        ] {
2202            assert!(rendered.contains(needle), "missing {needle}\n{rendered}");
2203        }
2204    }
2205}