Skip to main content

harn_vm/connectors/
mod.rs

1//! Connector traits and shared helpers for inbound event-source providers.
2//!
3//! This lands in `harn-vm` for now because the current dependency surface
4//! (`EventLog`, `SecretProvider`, `TriggerEvent`) already lives here. If the
5//! connector ecosystem grows enough to justify extraction, the module can be
6//! split into a dedicated crate later without changing the high-level contract.
7
8use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex, OnceLock};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25    registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26    ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod a2a_push;
30pub mod cron;
31pub mod effect_policy;
32pub mod harn_module;
33pub mod hmac;
34pub mod shared;
35pub mod stream;
36#[cfg(test)]
37pub(crate) mod test_util;
38pub mod testkit;
39pub mod webhook;
40
41pub use a2a_push::A2aPushConnector;
42pub use cron::{CatchupMode, CronConnector};
43pub use effect_policy::{
44    connector_export_denied_builtin_reason, connector_export_effect_class,
45    default_connector_export_policy, ConnectorExportEffectClass, HarnConnectorEffectPolicies,
46};
47pub use harn_module::{
48    load_contract as load_harn_connector_contract, HarnConnector, HarnConnectorContract,
49};
50pub use hmac::{
51    verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
52    DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
53    DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
54    DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
55    DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
56    DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
57    SIGNATURE_VERIFY_AUDIT_TOPIC,
58};
59pub use shared::{
60    paginate_cursor, resolve_jwks, verify_hmac_signature, verify_jwt_claims, verify_jwt_json,
61    ConnectorBase, CursorPage, HmacSignatureAlgorithm, JwtKeySource, JwtVerificationOptions,
62};
63pub use stream::StreamConnector;
64use webhook::WebhookProviderProfile;
65pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
66
67const OUTBOUND_CONNECTOR_HTTP_TIMEOUT: StdDuration = StdDuration::from_secs(30);
68
69pub(crate) fn outbound_http_client(user_agent: &'static str) -> reqwest::Client {
70    reqwest::Client::builder()
71        .user_agent(user_agent)
72        .timeout(OUTBOUND_CONNECTOR_HTTP_TIMEOUT)
73        .redirect(reqwest::redirect::Policy::custom(|attempt| {
74            if attempt.previous().len() >= 10 {
75                attempt.error("too many redirects")
76            } else if crate::egress::redirect_url_allowed(
77                "connector_redirect",
78                attempt.url().as_str(),
79            ) {
80                attempt.follow()
81            } else {
82                attempt.error("egress policy blocked redirect target")
83            }
84        }))
85        .build()
86        .expect("connector HTTP client configuration should be valid")
87}
88
89/// Shared owned handle to a connector instance registered with the runtime.
90pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
91
92thread_local! {
93    static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
94        RefCell::new(BTreeMap::new());
95}
96
97/// Provider implementation contract for inbound connectors.
98#[async_trait]
99pub trait Connector: Send + Sync {
100    /// Stable provider id such as `github`, `slack`, or `webhook`.
101    fn provider_id(&self) -> &ProviderId;
102
103    /// Trigger kinds this connector supports (`webhook`, `poll`, `stream`, ...).
104    fn kinds(&self) -> &[TriggerKind];
105
106    /// Called once per connector instance at orchestrator startup.
107    async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
108
109    /// Activate the bindings relevant to this connector instance.
110    async fn activate(
111        &self,
112        bindings: &[TriggerBinding],
113    ) -> Result<ActivationHandle, ConnectorError>;
114
115    /// Stop connector-owned background work and flush any connector-local state.
116    async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
117        Ok(())
118    }
119
120    /// Verify + normalize a provider-native inbound request into `TriggerEvent`.
121    async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
122
123    /// Verify + normalize a provider-native inbound request into the richer
124    /// connector result contract used by ack-first webhook adapters.
125    async fn normalize_inbound_result(
126        &self,
127        raw: RawInbound,
128    ) -> Result<ConnectorNormalizeResult, ConnectorError> {
129        self.normalize_inbound(raw)
130            .await
131            .map(ConnectorNormalizeResult::event)
132    }
133
134    /// Payload schema surfaced to future trigger-type narrowing.
135    fn payload_schema(&self) -> ProviderPayloadSchema;
136
137    /// Outbound API wrapper exposed to handlers.
138    fn client(&self) -> Arc<dyn ConnectorClient>;
139}
140
141/// Provider-supplied HTTP response returned before or instead of trigger dispatch.
142#[derive(Clone, Debug, PartialEq)]
143pub struct ConnectorHttpResponse {
144    pub status: u16,
145    pub headers: BTreeMap<String, String>,
146    pub body: JsonValue,
147}
148
149impl ConnectorHttpResponse {
150    pub fn new(status: u16, headers: BTreeMap<String, String>, body: JsonValue) -> Self {
151        Self {
152            status,
153            headers,
154            body,
155        }
156    }
157}
158
159/// Normalized inbound result accepted by the runtime connector adapter.
160#[derive(Clone, Debug, PartialEq)]
161pub enum ConnectorNormalizeResult {
162    Event(Box<TriggerEvent>),
163    Batch(Vec<TriggerEvent>),
164    ImmediateResponse {
165        response: ConnectorHttpResponse,
166        events: Vec<TriggerEvent>,
167    },
168    Reject(ConnectorHttpResponse),
169}
170
171impl ConnectorNormalizeResult {
172    pub fn event(event: TriggerEvent) -> Self {
173        Self::Event(Box::new(event))
174    }
175
176    pub fn into_events(self) -> Vec<TriggerEvent> {
177        match self {
178            Self::Event(event) => vec![*event],
179            Self::Batch(events) | Self::ImmediateResponse { events, .. } => events,
180            Self::Reject(_) => Vec::new(),
181        }
182    }
183}
184
185#[derive(Clone, Debug, PartialEq)]
186pub enum PostNormalizeOutcome {
187    Ready(Box<TriggerEvent>),
188    DuplicateDropped,
189}
190
191pub async fn postprocess_normalized_event(
192    inbox: &InboxIndex,
193    binding_id: &str,
194    dedupe_enabled: bool,
195    dedupe_ttl: StdDuration,
196    mut event: TriggerEvent,
197) -> Result<PostNormalizeOutcome, ConnectorError> {
198    if dedupe_enabled && !event.dedupe_claimed() {
199        if !inbox
200            .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
201            .await?
202        {
203            return Ok(PostNormalizeOutcome::DuplicateDropped);
204        }
205        event.mark_dedupe_claimed();
206    }
207
208    Ok(PostNormalizeOutcome::Ready(Box::new(event)))
209}
210
211/// Outbound provider client interface used by connector-backed stdlib modules.
212#[async_trait]
213pub trait ConnectorClient: Send + Sync {
214    async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
215}
216
217/// Minimal outbound client errors shared by connector implementations.
218#[derive(Clone, Debug, PartialEq, Eq)]
219pub enum ClientError {
220    MethodNotFound(String),
221    InvalidArgs(String),
222    RateLimited(String),
223    Transport(String),
224    EgressBlocked(crate::egress::EgressBlocked),
225    Other(String),
226}
227
228impl fmt::Display for ClientError {
229    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230        match self {
231            Self::MethodNotFound(message)
232            | Self::InvalidArgs(message)
233            | Self::RateLimited(message)
234            | Self::Transport(message)
235            | Self::Other(message) => message.fmt(f),
236            Self::EgressBlocked(blocked) => blocked.fmt(f),
237        }
238    }
239}
240
241impl std::error::Error for ClientError {}
242
243/// Shared connector-layer errors.
244#[derive(Debug)]
245pub enum ConnectorError {
246    DuplicateProvider(String),
247    DuplicateDelivery(String),
248    UnknownProvider(String),
249    MissingHeader(String),
250    InvalidHeader {
251        name: String,
252        detail: String,
253    },
254    InvalidSignature(String),
255    TimestampOutOfWindow {
256        timestamp: OffsetDateTime,
257        now: OffsetDateTime,
258        window: time::Duration,
259    },
260    Json(String),
261    Secret(String),
262    EventLog(String),
263    HarnRuntime(String),
264    Client(ClientError),
265    Unsupported(String),
266    Activation(String),
267}
268
269impl ConnectorError {
270    pub fn invalid_signature(message: impl Into<String>) -> Self {
271        Self::InvalidSignature(message.into())
272    }
273}
274
275impl fmt::Display for ConnectorError {
276    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
277        match self {
278            Self::DuplicateProvider(provider) => {
279                write!(f, "connector provider `{provider}` is already registered")
280            }
281            Self::DuplicateDelivery(message) => message.fmt(f),
282            Self::UnknownProvider(provider) => {
283                write!(f, "connector provider `{provider}` is not registered")
284            }
285            Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
286            Self::InvalidHeader { name, detail } => {
287                write!(f, "invalid header `{name}`: {detail}")
288            }
289            Self::InvalidSignature(message)
290            | Self::Json(message)
291            | Self::Secret(message)
292            | Self::EventLog(message)
293            | Self::HarnRuntime(message)
294            | Self::Unsupported(message)
295            | Self::Activation(message) => message.fmt(f),
296            Self::TimestampOutOfWindow {
297                timestamp,
298                now,
299                window,
300            } => write!(
301                f,
302                "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
303            ),
304            Self::Client(error) => error.fmt(f),
305        }
306    }
307}
308
309impl std::error::Error for ConnectorError {}
310
311impl From<crate::event_log::LogError> for ConnectorError {
312    fn from(value: crate::event_log::LogError) -> Self {
313        Self::EventLog(value.to_string())
314    }
315}
316
317impl From<crate::secrets::SecretError> for ConnectorError {
318    fn from(value: crate::secrets::SecretError) -> Self {
319        Self::Secret(value.to_string())
320    }
321}
322
323impl From<serde_json::Error> for ConnectorError {
324    fn from(value: serde_json::Error) -> Self {
325        Self::Json(value.to_string())
326    }
327}
328
329impl From<ClientError> for ConnectorError {
330    fn from(value: ClientError) -> Self {
331        Self::Client(value)
332    }
333}
334
335/// Startup context shared with connector instances.
336#[derive(Clone)]
337pub struct ConnectorCtx {
338    pub event_log: Arc<AnyEventLog>,
339    pub secrets: Arc<dyn SecretProvider>,
340    pub inbox: Arc<InboxIndex>,
341    pub metrics: Arc<MetricsRegistry>,
342    pub rate_limiter: Arc<RateLimiterFactory>,
343}
344
345/// Snapshot of connector-local metrics surfaced for tests and diagnostics.
346#[derive(Clone, Debug, Default, PartialEq, Eq)]
347pub struct ConnectorMetricsSnapshot {
348    pub inbox_claims_written: u64,
349    pub inbox_duplicates_rejected: u64,
350    pub inbox_fast_path_hits: u64,
351    pub inbox_durable_hits: u64,
352    pub inbox_expired_entries: u64,
353    pub inbox_active_entries: u64,
354    pub linear_timestamp_rejections_total: u64,
355    pub dispatch_succeeded_total: u64,
356    pub dispatch_failed_total: u64,
357    pub retry_scheduled_total: u64,
358    pub slack_delivery_success_total: u64,
359    pub slack_delivery_failure_total: u64,
360}
361
362type MetricLabels = BTreeMap<String, String>;
363
364#[derive(Clone, Debug, Default, PartialEq)]
365struct HistogramMetric {
366    buckets: BTreeMap<String, u64>,
367    count: u64,
368    sum: f64,
369}
370
371static ACTIVE_METRICS_REGISTRY: OnceLock<Mutex<Option<Arc<MetricsRegistry>>>> = OnceLock::new();
372
373pub fn install_active_metrics_registry(metrics: Arc<MetricsRegistry>) {
374    let slot = ACTIVE_METRICS_REGISTRY.get_or_init(|| Mutex::new(None));
375    *slot.lock().expect("active metrics registry poisoned") = Some(metrics);
376}
377
378pub fn clear_active_metrics_registry() {
379    if let Some(slot) = ACTIVE_METRICS_REGISTRY.get() {
380        *slot.lock().expect("active metrics registry poisoned") = None;
381    }
382}
383
384pub fn active_metrics_registry() -> Option<Arc<MetricsRegistry>> {
385    ACTIVE_METRICS_REGISTRY.get().and_then(|slot| {
386        slot.lock()
387            .expect("active metrics registry poisoned")
388            .clone()
389    })
390}
391
392/// Shared metrics surface for connector-local counters and timings.
393#[derive(Debug, Default)]
394pub struct MetricsRegistry {
395    inbox_claims_written: AtomicU64,
396    inbox_duplicates_rejected: AtomicU64,
397    inbox_fast_path_hits: AtomicU64,
398    inbox_durable_hits: AtomicU64,
399    inbox_expired_entries: AtomicU64,
400    inbox_active_entries: AtomicU64,
401    linear_timestamp_rejections_total: AtomicU64,
402    dispatch_succeeded_total: AtomicU64,
403    dispatch_failed_total: AtomicU64,
404    retry_scheduled_total: AtomicU64,
405    slack_delivery_success_total: AtomicU64,
406    slack_delivery_failure_total: AtomicU64,
407    custom_counters: Mutex<BTreeMap<String, u64>>,
408    counters: Mutex<BTreeMap<(String, MetricLabels), f64>>,
409    gauges: Mutex<BTreeMap<(String, MetricLabels), f64>>,
410    histograms: Mutex<BTreeMap<(String, MetricLabels), HistogramMetric>>,
411    pending_trigger_events: Mutex<BTreeMap<MetricLabels, BTreeMap<String, i64>>>,
412}
413
414impl MetricsRegistry {
415    const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0];
416    const TRIGGER_LATENCY_BUCKETS: [f64; 15] = [
417        0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0,
418    ];
419    const SIZE_BUCKETS: [f64; 9] = [
420        128.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0, 10485760.0,
421    ];
422
423    pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
424        ConnectorMetricsSnapshot {
425            inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
426            inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
427            inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
428            inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
429            inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
430            inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
431            linear_timestamp_rejections_total: self
432                .linear_timestamp_rejections_total
433                .load(Ordering::Relaxed),
434            dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
435            dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
436            retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
437            slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
438            slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
439        }
440    }
441
442    pub(crate) fn record_inbox_claim(&self) {
443        self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
444    }
445
446    pub(crate) fn record_inbox_duplicate_fast_path(&self) {
447        self.inbox_duplicates_rejected
448            .fetch_add(1, Ordering::Relaxed);
449        self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
450    }
451
452    pub(crate) fn record_inbox_duplicate_durable(&self) {
453        self.inbox_duplicates_rejected
454            .fetch_add(1, Ordering::Relaxed);
455        self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
456    }
457
458    pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
459        if count > 0 {
460            self.inbox_expired_entries
461                .fetch_add(count, Ordering::Relaxed);
462        }
463    }
464
465    pub(crate) fn set_inbox_active_entries(&self, count: usize) {
466        self.inbox_active_entries
467            .store(count as u64, Ordering::Relaxed);
468    }
469
470    pub fn record_linear_timestamp_rejection(&self) {
471        self.linear_timestamp_rejections_total
472            .fetch_add(1, Ordering::Relaxed);
473    }
474
475    pub fn record_dispatch_succeeded(&self) {
476        self.dispatch_succeeded_total
477            .fetch_add(1, Ordering::Relaxed);
478    }
479
480    pub fn record_dispatch_failed(&self) {
481        self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
482    }
483
484    pub fn record_retry_scheduled(&self) {
485        self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
486    }
487
488    pub fn record_slack_delivery_success(&self) {
489        self.slack_delivery_success_total
490            .fetch_add(1, Ordering::Relaxed);
491    }
492
493    pub fn record_slack_delivery_failure(&self) {
494        self.slack_delivery_failure_total
495            .fetch_add(1, Ordering::Relaxed);
496    }
497
498    pub fn record_custom_counter(&self, name: &str, amount: u64) {
499        if amount == 0 {
500            return;
501        }
502        let mut counters = self
503            .custom_counters
504            .lock()
505            .expect("custom counters poisoned");
506        *counters.entry(name.to_string()).or_default() += amount;
507    }
508
509    pub fn record_http_request(
510        &self,
511        endpoint: &str,
512        method: &str,
513        status: u16,
514        duration: StdDuration,
515        body_size_bytes: usize,
516    ) {
517        self.increment_counter(
518            "harn_http_requests_total",
519            labels([
520                ("endpoint", endpoint),
521                ("method", method),
522                ("status", &status.to_string()),
523            ]),
524            1,
525        );
526        self.observe_histogram(
527            "harn_http_request_duration_seconds",
528            labels([("endpoint", endpoint)]),
529            duration.as_secs_f64(),
530            &Self::DURATION_BUCKETS,
531        );
532        self.observe_histogram(
533            "harn_http_body_size_bytes",
534            labels([("endpoint", endpoint)]),
535            body_size_bytes as f64,
536            &Self::SIZE_BUCKETS,
537        );
538    }
539
540    pub fn record_trigger_received(&self, trigger_id: &str, provider: &str) {
541        self.increment_counter(
542            "harn_trigger_received_total",
543            labels([("trigger_id", trigger_id), ("provider", provider)]),
544            1,
545        );
546    }
547
548    pub fn record_trigger_deduped(&self, trigger_id: &str, reason: &str) {
549        self.increment_counter(
550            "harn_trigger_deduped_total",
551            labels([("trigger_id", trigger_id), ("reason", reason)]),
552            1,
553        );
554    }
555
556    pub fn record_trigger_predicate_evaluation(
557        &self,
558        trigger_id: &str,
559        result: bool,
560        cost_usd: f64,
561    ) {
562        self.increment_counter(
563            "harn_trigger_predicate_evaluations_total",
564            labels([
565                ("trigger_id", trigger_id),
566                ("result", if result { "true" } else { "false" }),
567            ]),
568            1,
569        );
570        self.observe_histogram(
571            "harn_trigger_predicate_cost_usd",
572            labels([("trigger_id", trigger_id)]),
573            cost_usd.max(0.0),
574            &[0.0, 0.001, 0.01, 0.05, 0.1, 1.0],
575        );
576    }
577
578    pub fn record_trigger_dispatched(&self, trigger_id: &str, handler_kind: &str, outcome: &str) {
579        self.increment_counter(
580            "harn_trigger_dispatched_total",
581            labels([
582                ("trigger_id", trigger_id),
583                ("handler_kind", handler_kind),
584                ("outcome", outcome),
585            ]),
586            1,
587        );
588    }
589
590    pub fn record_trigger_retry(&self, trigger_id: &str, attempt: u32) {
591        self.increment_counter(
592            "harn_trigger_retries_total",
593            labels([
594                ("trigger_id", trigger_id),
595                ("attempt", &attempt.to_string()),
596            ]),
597            1,
598        );
599    }
600
601    pub fn record_trigger_dlq(&self, trigger_id: &str, reason: &str) {
602        self.increment_counter(
603            "harn_trigger_dlq_total",
604            labels([("trigger_id", trigger_id), ("reason", reason)]),
605            1,
606        );
607    }
608
609    pub fn record_trigger_accepted_to_normalized(
610        &self,
611        trigger_id: &str,
612        binding_key: &str,
613        provider: &str,
614        tenant_id: Option<&str>,
615        status: &str,
616        duration: StdDuration,
617    ) {
618        self.observe_histogram(
619            "harn_trigger_webhook_accepted_to_normalized_seconds",
620            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
621            duration.as_secs_f64(),
622            &Self::TRIGGER_LATENCY_BUCKETS,
623        );
624    }
625
626    pub fn record_trigger_accepted_to_queue_append(
627        &self,
628        trigger_id: &str,
629        binding_key: &str,
630        provider: &str,
631        tenant_id: Option<&str>,
632        status: &str,
633        duration: StdDuration,
634    ) {
635        self.observe_histogram(
636            "harn_trigger_webhook_accepted_to_queue_append_seconds",
637            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
638            duration.as_secs_f64(),
639            &Self::TRIGGER_LATENCY_BUCKETS,
640        );
641    }
642
643    pub fn record_trigger_queue_age_at_dispatch_admission(
644        &self,
645        trigger_id: &str,
646        binding_key: &str,
647        provider: &str,
648        tenant_id: Option<&str>,
649        status: &str,
650        age: StdDuration,
651    ) {
652        self.observe_histogram(
653            "harn_trigger_queue_age_at_dispatch_admission_seconds",
654            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
655            age.as_secs_f64(),
656            &Self::TRIGGER_LATENCY_BUCKETS,
657        );
658    }
659
660    pub fn record_trigger_queue_age_at_dispatch_start(
661        &self,
662        trigger_id: &str,
663        binding_key: &str,
664        provider: &str,
665        tenant_id: Option<&str>,
666        status: &str,
667        age: StdDuration,
668    ) {
669        self.observe_histogram(
670            "harn_trigger_queue_age_at_dispatch_start_seconds",
671            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
672            age.as_secs_f64(),
673            &Self::TRIGGER_LATENCY_BUCKETS,
674        );
675    }
676
677    pub fn record_trigger_dispatch_runtime(
678        &self,
679        trigger_id: &str,
680        binding_key: &str,
681        provider: &str,
682        tenant_id: Option<&str>,
683        status: &str,
684        duration: StdDuration,
685    ) {
686        self.observe_histogram(
687            "harn_trigger_dispatch_runtime_seconds",
688            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
689            duration.as_secs_f64(),
690            &Self::TRIGGER_LATENCY_BUCKETS,
691        );
692    }
693
694    pub fn record_trigger_retry_delay(
695        &self,
696        trigger_id: &str,
697        binding_key: &str,
698        provider: &str,
699        tenant_id: Option<&str>,
700        status: &str,
701        duration: StdDuration,
702    ) {
703        self.observe_histogram(
704            "harn_trigger_retry_delay_seconds",
705            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
706            duration.as_secs_f64(),
707            &Self::TRIGGER_LATENCY_BUCKETS,
708        );
709    }
710
711    pub fn record_trigger_accepted_to_dlq(
712        &self,
713        trigger_id: &str,
714        binding_key: &str,
715        provider: &str,
716        tenant_id: Option<&str>,
717        status: &str,
718        duration: StdDuration,
719    ) {
720        self.observe_histogram(
721            "harn_trigger_accepted_to_dlq_seconds",
722            trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
723            duration.as_secs_f64(),
724            &Self::TRIGGER_LATENCY_BUCKETS,
725        );
726    }
727
728    pub fn note_trigger_pending_event(
729        &self,
730        event_id: &str,
731        trigger_id: &str,
732        binding_key: &str,
733        provider: &str,
734        tenant_id: Option<&str>,
735        accepted_at_ms: i64,
736        now_ms: i64,
737    ) {
738        let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
739        {
740            let mut pending = self
741                .pending_trigger_events
742                .lock()
743                .expect("pending trigger events poisoned");
744            pending
745                .entry(labels.clone())
746                .or_default()
747                .insert(event_id.to_string(), accepted_at_ms);
748        }
749        self.refresh_oldest_pending_gauge(labels, now_ms);
750    }
751
752    pub fn clear_trigger_pending_event(
753        &self,
754        event_id: &str,
755        trigger_id: &str,
756        binding_key: &str,
757        provider: &str,
758        tenant_id: Option<&str>,
759        now_ms: i64,
760    ) {
761        let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
762        {
763            let mut pending = self
764                .pending_trigger_events
765                .lock()
766                .expect("pending trigger events poisoned");
767            if let Some(events) = pending.get_mut(&labels) {
768                events.remove(event_id);
769                if events.is_empty() {
770                    pending.remove(&labels);
771                }
772            }
773        }
774        self.refresh_oldest_pending_gauge(labels, now_ms);
775    }
776
777    pub fn set_trigger_inflight(&self, trigger_id: &str, count: u64) {
778        self.set_gauge(
779            "harn_trigger_inflight",
780            labels([("trigger_id", trigger_id)]),
781            count as f64,
782        );
783    }
784
785    pub fn set_trigger_budget_cost_today(&self, trigger_id: &str, cost_usd: f64) {
786        self.set_gauge(
787            "harn_trigger_budget_cost_today_usd",
788            labels([("trigger_id", trigger_id)]),
789            cost_usd.max(0.0),
790        );
791    }
792
793    pub fn record_trigger_budget_exhausted(&self, trigger_id: &str, strategy: &str) {
794        self.increment_counter(
795            "harn_trigger_budget_exhausted_total",
796            labels([("trigger_id", trigger_id), ("strategy", strategy)]),
797            1,
798        );
799    }
800
801    pub fn record_backpressure_event(&self, dimension: &str, action: &str) {
802        self.increment_counter(
803            "harn_backpressure_events_total",
804            labels([("dimension", dimension), ("action", action)]),
805            1,
806        );
807    }
808
809    pub fn record_event_log_append(
810        &self,
811        topic: &str,
812        duration: StdDuration,
813        payload_bytes: usize,
814    ) {
815        self.observe_histogram(
816            "harn_event_log_append_duration_seconds",
817            labels([("topic", topic)]),
818            duration.as_secs_f64(),
819            &Self::DURATION_BUCKETS,
820        );
821        self.set_gauge(
822            "harn_event_log_topic_size_bytes",
823            labels([("topic", topic)]),
824            payload_bytes as f64,
825        );
826    }
827
828    pub fn set_event_log_consumer_lag(&self, topic: &str, consumer: &str, lag: u64) {
829        self.set_gauge(
830            "harn_event_log_consumer_lag",
831            labels([("topic", topic), ("consumer", consumer)]),
832            lag as f64,
833        );
834    }
835
836    pub fn record_a2a_hop(&self, target: &str, outcome: &str, duration: StdDuration) {
837        self.increment_counter(
838            "harn_a2a_hops_total",
839            labels([("target", target), ("outcome", outcome)]),
840            1,
841        );
842        self.observe_histogram(
843            "harn_a2a_hop_duration_seconds",
844            labels([("target", target)]),
845            duration.as_secs_f64(),
846            &Self::DURATION_BUCKETS,
847        );
848    }
849
850    pub fn set_worker_queue_depth(&self, queue: &str, depth: u64) {
851        self.set_gauge(
852            "harn_worker_queue_depth",
853            labels([("queue", queue)]),
854            depth as f64,
855        );
856    }
857
858    pub fn record_worker_queue_claim_age(&self, queue: &str, age_seconds: f64) {
859        self.observe_histogram(
860            "harn_worker_queue_claim_age_seconds",
861            labels([("queue", queue)]),
862            age_seconds.max(0.0),
863            &Self::DURATION_BUCKETS,
864        );
865    }
866
867    /// Increment the scheduler-selection counter for a particular fairness key.
868    pub fn record_scheduler_selection(
869        &self,
870        queue: &str,
871        fairness_dimension: &str,
872        fairness_key: &str,
873    ) {
874        self.increment_counter(
875            "harn_scheduler_selections_total",
876            labels([
877                ("queue", queue),
878                ("fairness_dimension", fairness_dimension),
879                ("fairness_key", fairness_key),
880            ]),
881            1,
882        );
883    }
884
885    /// Increment the scheduler-deferred counter (queue had work but couldn't
886    /// be selected because the key was at its concurrency cap).
887    pub fn record_scheduler_deferral(
888        &self,
889        queue: &str,
890        fairness_dimension: &str,
891        fairness_key: &str,
892    ) {
893        self.increment_counter(
894            "harn_scheduler_deferrals_total",
895            labels([
896                ("queue", queue),
897                ("fairness_dimension", fairness_dimension),
898                ("fairness_key", fairness_key),
899            ]),
900            1,
901        );
902    }
903
904    /// Increment the scheduler starvation-promotion counter.
905    pub fn record_scheduler_starvation_promotion(
906        &self,
907        queue: &str,
908        fairness_dimension: &str,
909        fairness_key: &str,
910    ) {
911        self.increment_counter(
912            "harn_scheduler_starvation_promotions_total",
913            labels([
914                ("queue", queue),
915                ("fairness_dimension", fairness_dimension),
916                ("fairness_key", fairness_key),
917            ]),
918            1,
919        );
920    }
921
922    /// Set the current scheduler deficit gauge for a fairness key.
923    pub fn set_scheduler_deficit(
924        &self,
925        queue: &str,
926        fairness_dimension: &str,
927        fairness_key: &str,
928        deficit: i64,
929    ) {
930        self.set_gauge(
931            "harn_scheduler_deficit",
932            labels([
933                ("queue", queue),
934                ("fairness_dimension", fairness_dimension),
935                ("fairness_key", fairness_key),
936            ]),
937            deficit as f64,
938        );
939    }
940
941    /// Set the oldest-eligible-job-age gauge for a fairness key (seconds).
942    pub fn set_scheduler_oldest_eligible_age(
943        &self,
944        queue: &str,
945        fairness_dimension: &str,
946        fairness_key: &str,
947        age_ms: u64,
948    ) {
949        self.set_gauge(
950            "harn_scheduler_oldest_eligible_age_seconds",
951            labels([
952                ("queue", queue),
953                ("fairness_dimension", fairness_dimension),
954                ("fairness_key", fairness_key),
955            ]),
956            age_ms as f64 / 1000.0,
957        );
958    }
959
960    pub fn set_orchestrator_pump_backlog(&self, topic: &str, count: u64) {
961        self.set_gauge(
962            "harn_orchestrator_pump_backlog",
963            labels([("topic", topic)]),
964            count as f64,
965        );
966    }
967
968    pub fn set_orchestrator_pump_outstanding(&self, topic: &str, count: usize) {
969        self.set_gauge(
970            "harn_orchestrator_pump_outstanding",
971            labels([("topic", topic)]),
972            count as f64,
973        );
974    }
975
976    pub fn record_orchestrator_pump_admission_delay(&self, topic: &str, duration: StdDuration) {
977        self.observe_histogram(
978            "harn_orchestrator_pump_admission_delay_seconds",
979            labels([("topic", topic)]),
980            duration.as_secs_f64(),
981            &Self::DURATION_BUCKETS,
982        );
983    }
984
985    pub fn record_llm_call(&self, provider: &str, model: &str, outcome: &str, cost_usd: f64) {
986        self.increment_counter(
987            "harn_llm_calls_total",
988            labels([
989                ("provider", provider),
990                ("model", model),
991                ("outcome", outcome),
992            ]),
993            1,
994        );
995        if cost_usd > 0.0 {
996            self.increment_counter(
997                "harn_llm_cost_usd_total",
998                labels([("provider", provider), ("model", model)]),
999                cost_usd,
1000            );
1001        } else {
1002            self.ensure_counter(
1003                "harn_llm_cost_usd_total",
1004                labels([("provider", provider), ("model", model)]),
1005            );
1006        }
1007    }
1008
1009    pub fn record_llm_cache_hit(&self, provider: &str) {
1010        self.increment_counter(
1011            "harn_llm_cache_hits_total",
1012            labels([("provider", provider)]),
1013            1,
1014        );
1015    }
1016
1017    /// Track LLM streaming responses aborted mid-stream by
1018    /// `schema_stream_abort` because the partial JSON cannot satisfy
1019    /// `output_schema`. Counter is labelled by `(provider, model)` so
1020    /// dashboards can attribute the savings — each abort short-circuits
1021    /// a provider stream that would otherwise have run to completion.
1022    pub fn record_schema_stream_aborted(&self, provider: &str, model: &str) {
1023        self.increment_counter(
1024            "harn_llm_schema_stream_aborted_total",
1025            labels([("provider", provider), ("model", model)]),
1026            1,
1027        );
1028    }
1029
1030    pub fn render_prometheus(&self) -> String {
1031        let snapshot = self.snapshot();
1032        let counters = [
1033            (
1034                "connector_linear_timestamp_rejections_total",
1035                snapshot.linear_timestamp_rejections_total,
1036            ),
1037            (
1038                "dispatch_succeeded_total",
1039                snapshot.dispatch_succeeded_total,
1040            ),
1041            ("dispatch_failed_total", snapshot.dispatch_failed_total),
1042            ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
1043            ("retry_scheduled_total", snapshot.retry_scheduled_total),
1044            (
1045                "slack_events_delivery_success_total",
1046                snapshot.slack_delivery_success_total,
1047            ),
1048            (
1049                "slack_events_delivery_failure_total",
1050                snapshot.slack_delivery_failure_total,
1051            ),
1052        ];
1053
1054        let mut rendered = String::new();
1055        for (name, value) in counters {
1056            rendered.push_str("# TYPE ");
1057            rendered.push_str(name);
1058            rendered.push_str(" counter\n");
1059            rendered.push_str(name);
1060            rendered.push(' ');
1061            rendered.push_str(&value.to_string());
1062            rendered.push('\n');
1063        }
1064        let custom_counters = self
1065            .custom_counters
1066            .lock()
1067            .expect("custom counters poisoned");
1068        for (name, value) in custom_counters.iter() {
1069            let metric_name = format!(
1070                "connector_custom_{}_total",
1071                name.chars()
1072                    .map(|ch| if ch.is_ascii_alphanumeric() || ch == '_' {
1073                        ch
1074                    } else {
1075                        '_'
1076                    })
1077                    .collect::<String>()
1078            );
1079            rendered.push_str("# TYPE ");
1080            rendered.push_str(&metric_name);
1081            rendered.push_str(" counter\n");
1082            rendered.push_str(&metric_name);
1083            rendered.push(' ');
1084            rendered.push_str(&value.to_string());
1085            rendered.push('\n');
1086        }
1087        rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
1088        rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
1089        rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
1090        rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
1091        self.render_generic_metrics(&mut rendered);
1092        rendered
1093    }
1094
1095    fn increment_counter(&self, name: &str, labels: MetricLabels, amount: impl Into<f64>) {
1096        let amount = amount.into();
1097        if amount <= 0.0 || !amount.is_finite() {
1098            return;
1099        }
1100        let mut counters = self.counters.lock().expect("metrics counters poisoned");
1101        *counters.entry((name.to_string(), labels)).or_default() += amount;
1102    }
1103
1104    fn ensure_counter(&self, name: &str, labels: MetricLabels) {
1105        let mut counters = self.counters.lock().expect("metrics counters poisoned");
1106        counters.entry((name.to_string(), labels)).or_default();
1107    }
1108
1109    fn set_gauge(&self, name: &str, labels: MetricLabels, value: f64) {
1110        let mut gauges = self.gauges.lock().expect("metrics gauges poisoned");
1111        gauges.insert((name.to_string(), labels), value);
1112    }
1113
1114    fn observe_histogram(
1115        &self,
1116        name: &str,
1117        labels: MetricLabels,
1118        value: f64,
1119        bucket_bounds: &[f64],
1120    ) {
1121        if !value.is_finite() {
1122            return;
1123        }
1124        let mut histograms = self.histograms.lock().expect("metrics histograms poisoned");
1125        let histogram = histograms
1126            .entry((name.to_string(), labels))
1127            .or_insert_with(|| HistogramMetric {
1128                buckets: bucket_bounds
1129                    .iter()
1130                    .map(|bound| (prometheus_float(*bound), 0))
1131                    .chain(std::iter::once(("+Inf".to_string(), 0)))
1132                    .collect(),
1133                count: 0,
1134                sum: 0.0,
1135            });
1136        histogram.count += 1;
1137        histogram.sum += value;
1138        for bound in bucket_bounds {
1139            if value <= *bound {
1140                let key = prometheus_float(*bound);
1141                *histogram.buckets.entry(key).or_default() += 1;
1142            }
1143        }
1144        *histogram.buckets.entry("+Inf".to_string()).or_default() += 1;
1145    }
1146
1147    fn refresh_oldest_pending_gauge(&self, labels: MetricLabels, now_ms: i64) {
1148        let oldest_accepted_at_ms = self
1149            .pending_trigger_events
1150            .lock()
1151            .expect("pending trigger events poisoned")
1152            .get(&labels)
1153            .and_then(|events| events.values().min().copied());
1154        let age_seconds = oldest_accepted_at_ms
1155            .map(|accepted_at_ms| millis_delta(now_ms, accepted_at_ms).as_secs_f64())
1156            .unwrap_or(0.0);
1157        self.set_gauge(
1158            "harn_trigger_oldest_pending_age_seconds",
1159            labels,
1160            age_seconds,
1161        );
1162    }
1163
1164    fn render_generic_metrics(&self, rendered: &mut String) {
1165        let counters = self
1166            .counters
1167            .lock()
1168            .expect("metrics counters poisoned")
1169            .clone();
1170        let gauges = self.gauges.lock().expect("metrics gauges poisoned").clone();
1171        let histograms = self
1172            .histograms
1173            .lock()
1174            .expect("metrics histograms poisoned")
1175            .clone();
1176
1177        for name in metric_family_names(MetricKind::Counter) {
1178            rendered.push_str("# TYPE ");
1179            rendered.push_str(name);
1180            rendered.push_str(" counter\n");
1181            for ((sample_name, labels), value) in counters.iter().filter(|((n, _), _)| n == name) {
1182                render_sample(rendered, sample_name, labels, *value);
1183            }
1184        }
1185        for name in metric_family_names(MetricKind::Gauge) {
1186            rendered.push_str("# TYPE ");
1187            rendered.push_str(name);
1188            rendered.push_str(" gauge\n");
1189            for ((sample_name, labels), value) in gauges.iter().filter(|((n, _), _)| n == name) {
1190                render_sample(rendered, sample_name, labels, *value);
1191            }
1192        }
1193        for name in metric_family_names(MetricKind::Histogram) {
1194            rendered.push_str("# TYPE ");
1195            rendered.push_str(name);
1196            rendered.push_str(" histogram\n");
1197            for ((sample_name, labels), histogram) in
1198                histograms.iter().filter(|((n, _), _)| n == name)
1199            {
1200                for (le, value) in &histogram.buckets {
1201                    let mut bucket_labels = labels.clone();
1202                    bucket_labels.insert("le".to_string(), le.clone());
1203                    render_sample(
1204                        rendered,
1205                        &format!("{sample_name}_bucket"),
1206                        &bucket_labels,
1207                        *value as f64,
1208                    );
1209                }
1210                render_sample(
1211                    rendered,
1212                    &format!("{sample_name}_sum"),
1213                    labels,
1214                    histogram.sum,
1215                );
1216                render_sample(
1217                    rendered,
1218                    &format!("{sample_name}_count"),
1219                    labels,
1220                    histogram.count as f64,
1221                );
1222            }
1223        }
1224    }
1225}
1226
1227#[derive(Clone, Copy)]
1228enum MetricKind {
1229    Counter,
1230    Gauge,
1231    Histogram,
1232}
1233
1234fn metric_family_names(kind: MetricKind) -> &'static [&'static str] {
1235    match kind {
1236        MetricKind::Counter => &[
1237            "harn_http_requests_total",
1238            "harn_trigger_received_total",
1239            "harn_trigger_deduped_total",
1240            "harn_trigger_predicate_evaluations_total",
1241            "harn_trigger_dispatched_total",
1242            "harn_trigger_retries_total",
1243            "harn_trigger_dlq_total",
1244            "harn_trigger_budget_exhausted_total",
1245            "harn_backpressure_events_total",
1246            "harn_a2a_hops_total",
1247            "harn_llm_calls_total",
1248            "harn_llm_cost_usd_total",
1249            "harn_llm_cache_hits_total",
1250            "harn_llm_schema_stream_aborted_total",
1251            "harn_scheduler_selections_total",
1252            "harn_scheduler_deferrals_total",
1253            "harn_scheduler_starvation_promotions_total",
1254        ],
1255        MetricKind::Gauge => &[
1256            "harn_trigger_inflight",
1257            "harn_event_log_topic_size_bytes",
1258            "harn_event_log_consumer_lag",
1259            "harn_trigger_budget_cost_today_usd",
1260            "harn_worker_queue_depth",
1261            "harn_orchestrator_pump_backlog",
1262            "harn_orchestrator_pump_outstanding",
1263            "harn_trigger_oldest_pending_age_seconds",
1264            "harn_scheduler_deficit",
1265            "harn_scheduler_oldest_eligible_age_seconds",
1266        ],
1267        MetricKind::Histogram => &[
1268            "harn_http_request_duration_seconds",
1269            "harn_http_body_size_bytes",
1270            "harn_trigger_predicate_cost_usd",
1271            "harn_event_log_append_duration_seconds",
1272            "harn_a2a_hop_duration_seconds",
1273            "harn_worker_queue_claim_age_seconds",
1274            "harn_orchestrator_pump_admission_delay_seconds",
1275            "harn_trigger_webhook_accepted_to_normalized_seconds",
1276            "harn_trigger_webhook_accepted_to_queue_append_seconds",
1277            "harn_trigger_queue_age_at_dispatch_admission_seconds",
1278            "harn_trigger_queue_age_at_dispatch_start_seconds",
1279            "harn_trigger_dispatch_runtime_seconds",
1280            "harn_trigger_retry_delay_seconds",
1281            "harn_trigger_accepted_to_dlq_seconds",
1282        ],
1283    }
1284}
1285
1286fn labels<const N: usize>(pairs: [(&str, &str); N]) -> MetricLabels {
1287    pairs
1288        .into_iter()
1289        .map(|(name, value)| (name.to_string(), value.to_string()))
1290        .collect()
1291}
1292
1293fn trigger_lifecycle_labels(
1294    trigger_id: &str,
1295    binding_key: &str,
1296    provider: &str,
1297    tenant_id: Option<&str>,
1298    status: &str,
1299) -> MetricLabels {
1300    labels([
1301        ("binding_key", binding_key),
1302        ("provider", provider),
1303        ("status", status),
1304        ("tenant_id", tenant_label(tenant_id)),
1305        ("trigger_id", trigger_id),
1306    ])
1307}
1308
1309fn trigger_pending_labels(
1310    trigger_id: &str,
1311    binding_key: &str,
1312    provider: &str,
1313    tenant_id: Option<&str>,
1314) -> MetricLabels {
1315    labels([
1316        ("binding_key", binding_key),
1317        ("provider", provider),
1318        ("tenant_id", tenant_label(tenant_id)),
1319        ("trigger_id", trigger_id),
1320    ])
1321}
1322
1323fn tenant_label(tenant_id: Option<&str>) -> &str {
1324    tenant_id
1325        .map(str::trim)
1326        .filter(|value| !value.is_empty())
1327        .unwrap_or("none")
1328}
1329
1330fn millis_delta(later_ms: i64, earlier_ms: i64) -> StdDuration {
1331    StdDuration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
1332}
1333
1334fn render_sample(rendered: &mut String, name: &str, labels: &MetricLabels, value: f64) {
1335    rendered.push_str(name);
1336    if !labels.is_empty() {
1337        rendered.push('{');
1338        for (index, (label, label_value)) in labels.iter().enumerate() {
1339            if index > 0 {
1340                rendered.push(',');
1341            }
1342            rendered.push_str(label);
1343            rendered.push_str("=\"");
1344            rendered.push_str(&escape_label_value(label_value));
1345            rendered.push('"');
1346        }
1347        rendered.push('}');
1348    }
1349    rendered.push(' ');
1350    rendered.push_str(&prometheus_float(value));
1351    rendered.push('\n');
1352}
1353
1354fn escape_label_value(value: &str) -> String {
1355    value
1356        .chars()
1357        .flat_map(|ch| match ch {
1358            '\\' => "\\\\".chars().collect::<Vec<_>>(),
1359            '"' => "\\\"".chars().collect::<Vec<_>>(),
1360            '\n' => "\\n".chars().collect::<Vec<_>>(),
1361            other => vec![other],
1362        })
1363        .collect()
1364}
1365
1366fn prometheus_float(value: f64) -> String {
1367    if value.is_infinite() && value.is_sign_positive() {
1368        return "+Inf".to_string();
1369    }
1370    if value.fract() == 0.0 {
1371        format!("{value:.0}")
1372    } else {
1373        let rendered = format!("{value:.6}");
1374        rendered
1375            .trim_end_matches('0')
1376            .trim_end_matches('.')
1377            .to_string()
1378    }
1379}
1380
1381/// Provider payload schema metadata exposed by a connector.
1382#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1383pub struct ProviderPayloadSchema {
1384    pub harn_schema_name: String,
1385    #[serde(default)]
1386    pub json_schema: JsonValue,
1387}
1388
1389impl ProviderPayloadSchema {
1390    pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
1391        Self {
1392            harn_schema_name: harn_schema_name.into(),
1393            json_schema,
1394        }
1395    }
1396
1397    pub fn named(harn_schema_name: impl Into<String>) -> Self {
1398        Self::new(harn_schema_name, JsonValue::Null)
1399    }
1400}
1401
1402impl Default for ProviderPayloadSchema {
1403    fn default() -> Self {
1404        Self::named("raw")
1405    }
1406}
1407
1408/// High-level transport kind a connector supports.
1409#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1410#[serde(transparent)]
1411pub struct TriggerKind(String);
1412
1413impl TriggerKind {
1414    pub fn new(value: impl Into<String>) -> Self {
1415        Self(value.into())
1416    }
1417
1418    pub fn as_str(&self) -> &str {
1419        self.0.as_str()
1420    }
1421}
1422
1423impl From<&str> for TriggerKind {
1424    fn from(value: &str) -> Self {
1425        Self::new(value)
1426    }
1427}
1428
1429impl From<String> for TriggerKind {
1430    fn from(value: String) -> Self {
1431        Self::new(value)
1432    }
1433}
1434
1435/// Future trigger manifest binding routed to a connector activation.
1436#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1437pub struct TriggerBinding {
1438    pub provider: ProviderId,
1439    pub kind: TriggerKind,
1440    pub binding_id: String,
1441    #[serde(default)]
1442    pub dedupe_key: Option<String>,
1443    #[serde(default = "default_dedupe_retention_days")]
1444    pub dedupe_retention_days: u32,
1445    #[serde(default)]
1446    pub config: JsonValue,
1447}
1448
1449impl TriggerBinding {
1450    pub fn new(
1451        provider: ProviderId,
1452        kind: impl Into<TriggerKind>,
1453        binding_id: impl Into<String>,
1454    ) -> Self {
1455        Self {
1456            provider,
1457            kind: kind.into(),
1458            binding_id: binding_id.into(),
1459            dedupe_key: None,
1460            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1461            config: JsonValue::Null,
1462        }
1463    }
1464}
1465
1466fn default_dedupe_retention_days() -> u32 {
1467    crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
1468}
1469
1470/// Small in-memory trigger-binding registry used to fan bindings into connectors.
1471#[derive(Clone, Debug, Default)]
1472pub struct TriggerRegistry {
1473    bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
1474}
1475
1476impl TriggerRegistry {
1477    pub fn register(&mut self, binding: TriggerBinding) {
1478        self.bindings
1479            .entry(binding.provider.clone())
1480            .or_default()
1481            .push(binding);
1482    }
1483
1484    pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
1485        &self.bindings
1486    }
1487
1488    pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
1489        self.bindings
1490            .get(provider)
1491            .map(Vec::as_slice)
1492            .unwrap_or(&[])
1493    }
1494}
1495
1496/// Metadata returned from connector activation.
1497#[derive(Clone, Debug, PartialEq, Eq)]
1498pub struct ActivationHandle {
1499    pub provider: ProviderId,
1500    pub binding_count: usize,
1501}
1502
1503impl ActivationHandle {
1504    pub fn new(provider: ProviderId, binding_count: usize) -> Self {
1505        Self {
1506            provider,
1507            binding_count,
1508        }
1509    }
1510}
1511
1512/// Provider-native inbound request payload preserved as raw bytes.
1513#[derive(Clone, Debug, PartialEq)]
1514pub struct RawInbound {
1515    pub kind: String,
1516    pub headers: BTreeMap<String, String>,
1517    pub query: BTreeMap<String, String>,
1518    pub body: Vec<u8>,
1519    pub received_at: OffsetDateTime,
1520    pub occurred_at: Option<OffsetDateTime>,
1521    pub tenant_id: Option<TenantId>,
1522    pub metadata: JsonValue,
1523}
1524
1525impl RawInbound {
1526    pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
1527        Self {
1528            kind: kind.into(),
1529            headers,
1530            query: BTreeMap::new(),
1531            body,
1532            received_at: clock::now_utc(),
1533            occurred_at: None,
1534            tenant_id: None,
1535            metadata: JsonValue::Null,
1536        }
1537    }
1538
1539    pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
1540        Ok(serde_json::from_slice(&self.body)?)
1541    }
1542}
1543
1544/// Token-bucket configuration shared across connector clients.
1545#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1546pub struct RateLimitConfig {
1547    pub capacity: u32,
1548    pub refill_tokens: u32,
1549    pub refill_interval: StdDuration,
1550}
1551
1552impl Default for RateLimitConfig {
1553    fn default() -> Self {
1554        Self {
1555            capacity: 60,
1556            refill_tokens: 1,
1557            refill_interval: StdDuration::from_secs(1),
1558        }
1559    }
1560}
1561
1562#[derive(Clone, Debug)]
1563struct TokenBucket {
1564    tokens: f64,
1565    last_refill: ClockInstant,
1566}
1567
1568impl TokenBucket {
1569    fn full(config: RateLimitConfig) -> Self {
1570        Self {
1571            tokens: config.capacity as f64,
1572            last_refill: clock::instant_now(),
1573        }
1574    }
1575
1576    fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
1577        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1578        let rate = config.refill_tokens.max(1) as f64 / interval;
1579        let elapsed = now.duration_since(self.last_refill).as_secs_f64();
1580        self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
1581        self.last_refill = now;
1582    }
1583
1584    fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
1585        self.refill(config, now);
1586        if self.tokens >= 1.0 {
1587            self.tokens -= 1.0;
1588            true
1589        } else {
1590            false
1591        }
1592    }
1593
1594    fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
1595        if self.tokens >= 1.0 {
1596            return StdDuration::ZERO;
1597        }
1598        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1599        let rate = config.refill_tokens.max(1) as f64 / interval;
1600        let missing = (1.0 - self.tokens).max(0.0);
1601        StdDuration::from_secs_f64((missing / rate).max(0.001))
1602    }
1603}
1604
1605/// Shared per-provider, per-key token bucket factory for outbound connector clients.
1606#[derive(Debug)]
1607pub struct RateLimiterFactory {
1608    config: RateLimitConfig,
1609    buckets: Mutex<HashMap<(String, String), TokenBucket>>,
1610}
1611
1612impl RateLimiterFactory {
1613    pub fn new(config: RateLimitConfig) -> Self {
1614        Self {
1615            config,
1616            buckets: Mutex::new(HashMap::new()),
1617        }
1618    }
1619
1620    pub fn config(&self) -> RateLimitConfig {
1621        self.config
1622    }
1623
1624    pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
1625        ScopedRateLimiter {
1626            factory: self,
1627            provider: provider.clone(),
1628            key: key.into(),
1629        }
1630    }
1631
1632    pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
1633        let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1634        let bucket = buckets
1635            .entry((provider.as_str().to_string(), key.to_string()))
1636            .or_insert_with(|| TokenBucket::full(self.config));
1637        bucket.try_acquire(self.config, clock::instant_now())
1638    }
1639
1640    pub async fn acquire(&self, provider: &ProviderId, key: &str) {
1641        loop {
1642            let wait = {
1643                let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1644                let bucket = buckets
1645                    .entry((provider.as_str().to_string(), key.to_string()))
1646                    .or_insert_with(|| TokenBucket::full(self.config));
1647                if bucket.try_acquire(self.config, clock::instant_now()) {
1648                    return;
1649                }
1650                bucket.wait_duration(self.config)
1651            };
1652            // Honor the unified mock clock so tests that pin time via
1653            // `mock_time(...)` don't deadlock here: the bucket reads
1654            // `instant_now()` (mocked), and this sleep advances the same
1655            // mock instead of waiting on a wall-clock that never moves.
1656            clock::sleep(wait).await;
1657        }
1658    }
1659}
1660
1661impl Default for RateLimiterFactory {
1662    fn default() -> Self {
1663        Self::new(RateLimitConfig::default())
1664    }
1665}
1666
1667/// Borrowed view onto a single provider/key rate-limit scope.
1668#[derive(Clone, Debug)]
1669pub struct ScopedRateLimiter<'a> {
1670    factory: &'a RateLimiterFactory,
1671    provider: ProviderId,
1672    key: String,
1673}
1674
1675impl<'a> ScopedRateLimiter<'a> {
1676    pub fn try_acquire(&self) -> bool {
1677        self.factory.try_acquire(&self.provider, &self.key)
1678    }
1679
1680    pub async fn acquire(&self) {
1681        self.factory.acquire(&self.provider, &self.key).await;
1682    }
1683}
1684
1685/// Runtime connector registry keyed by provider id.
1686pub struct ConnectorRegistry {
1687    connectors: BTreeMap<ProviderId, ConnectorHandle>,
1688}
1689
1690impl ConnectorRegistry {
1691    pub fn empty() -> Self {
1692        Self {
1693            connectors: BTreeMap::new(),
1694        }
1695    }
1696
1697    pub fn with_defaults() -> Self {
1698        let mut registry = Self::empty();
1699        for provider in registered_provider_metadata() {
1700            if !matches!(provider.runtime, ProviderRuntimeMetadata::Builtin { .. }) {
1701                continue;
1702            }
1703            registry
1704                .register(default_connector_for_provider(&provider))
1705                .expect("default connector registration should not fail");
1706        }
1707        registry
1708    }
1709
1710    pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
1711        let provider = connector.provider_id().clone();
1712        if self.connectors.contains_key(&provider) {
1713            return Err(ConnectorError::DuplicateProvider(provider.0));
1714        }
1715        self.connectors
1716            .insert(provider, Arc::new(AsyncMutex::new(connector)));
1717        Ok(())
1718    }
1719
1720    pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
1721        self.connectors.get(id).cloned()
1722    }
1723
1724    pub fn remove(&mut self, id: &ProviderId) -> Option<ConnectorHandle> {
1725        self.connectors.remove(id)
1726    }
1727
1728    pub fn list(&self) -> Vec<ProviderId> {
1729        self.connectors.keys().cloned().collect()
1730    }
1731
1732    pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1733        for connector in self.connectors.values() {
1734            connector.lock().await.init(ctx.clone()).await?;
1735        }
1736        Ok(())
1737    }
1738
1739    pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
1740        let mut clients = BTreeMap::new();
1741        for (provider, connector) in &self.connectors {
1742            let client = connector.lock().await.client();
1743            clients.insert(provider.clone(), client);
1744        }
1745        clients
1746    }
1747
1748    pub async fn activate_all(
1749        &self,
1750        registry: &TriggerRegistry,
1751    ) -> Result<Vec<ActivationHandle>, ConnectorError> {
1752        let mut handles = Vec::new();
1753        for (provider, connector) in &self.connectors {
1754            let bindings = registry.bindings_for(provider);
1755            if bindings.is_empty() {
1756                continue;
1757            }
1758            let connector = connector.lock().await;
1759            handles.push(connector.activate(bindings).await?);
1760        }
1761        Ok(handles)
1762    }
1763}
1764
1765impl Default for ConnectorRegistry {
1766    fn default() -> Self {
1767        Self::with_defaults()
1768    }
1769}
1770
1771fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
1772    match &provider.runtime {
1773        ProviderRuntimeMetadata::Builtin {
1774            connector,
1775            default_signature_variant,
1776        } => match connector.as_str() {
1777            "a2a-push" => Box::new(A2aPushConnector::new()),
1778            "cron" => Box::new(CronConnector::new()),
1779            "stream" => Box::new(StreamConnector::new(
1780                ProviderId::from(provider.provider.clone()),
1781                provider.schema_name.clone(),
1782            )),
1783            "webhook" => {
1784                let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
1785                    .expect("catalog webhook signature variant must be valid");
1786                Box::new(GenericWebhookConnector::with_profile(
1787                    WebhookProviderProfile::new(
1788                        ProviderId::from(provider.provider.clone()),
1789                        provider.schema_name.clone(),
1790                        variant,
1791                    ),
1792                ))
1793            }
1794            _ => Box::new(PlaceholderConnector::from_metadata(provider)),
1795        },
1796        ProviderRuntimeMetadata::Placeholder => {
1797            Box::new(PlaceholderConnector::from_metadata(provider))
1798        }
1799    }
1800}
1801
1802struct PlaceholderConnector {
1803    provider_id: ProviderId,
1804    kinds: Vec<TriggerKind>,
1805    schema_name: String,
1806}
1807
1808impl PlaceholderConnector {
1809    fn from_metadata(metadata: &ProviderMetadata) -> Self {
1810        Self {
1811            provider_id: ProviderId::from(metadata.provider.clone()),
1812            kinds: metadata
1813                .kinds
1814                .iter()
1815                .cloned()
1816                .map(TriggerKind::from)
1817                .collect(),
1818            schema_name: metadata.schema_name.clone(),
1819        }
1820    }
1821}
1822
1823struct PlaceholderClient;
1824
1825#[async_trait]
1826impl ConnectorClient for PlaceholderClient {
1827    async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1828        Err(ClientError::Other(format!(
1829            "connector client method '{method}' is not implemented for this provider"
1830        )))
1831    }
1832}
1833
1834#[async_trait]
1835impl Connector for PlaceholderConnector {
1836    fn provider_id(&self) -> &ProviderId {
1837        &self.provider_id
1838    }
1839
1840    fn kinds(&self) -> &[TriggerKind] {
1841        &self.kinds
1842    }
1843
1844    async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1845        Ok(())
1846    }
1847
1848    async fn activate(
1849        &self,
1850        bindings: &[TriggerBinding],
1851    ) -> Result<ActivationHandle, ConnectorError> {
1852        Ok(ActivationHandle::new(
1853            self.provider_id.clone(),
1854            bindings.len(),
1855        ))
1856    }
1857
1858    async fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
1859        Err(ConnectorError::Unsupported(format!(
1860            "provider '{}' is cataloged but does not have a concrete inbound connector yet",
1861            self.provider_id.as_str()
1862        )))
1863    }
1864
1865    fn payload_schema(&self) -> ProviderPayloadSchema {
1866        ProviderPayloadSchema::named(self.schema_name.clone())
1867    }
1868
1869    fn client(&self) -> Arc<dyn ConnectorClient> {
1870        Arc::new(PlaceholderClient)
1871    }
1872}
1873
1874pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
1875    ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
1876        *slot.borrow_mut() = clients
1877            .into_iter()
1878            .map(|(provider, client)| (provider.as_str().to_string(), client))
1879            .collect();
1880    });
1881}
1882
1883pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
1884    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
1885}
1886
1887pub fn clear_active_connector_clients() {
1888    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
1889}
1890
1891#[cfg(test)]
1892mod tests {
1893    use super::*;
1894
1895    use std::sync::atomic::{AtomicUsize, Ordering};
1896
1897    use async_trait::async_trait;
1898    use serde_json::json;
1899
1900    struct NoopClient;
1901
1902    #[async_trait]
1903    impl ConnectorClient for NoopClient {
1904        async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1905            Ok(json!({ "method": method }))
1906        }
1907    }
1908
1909    struct FakeConnector {
1910        provider_id: ProviderId,
1911        kinds: Vec<TriggerKind>,
1912        activate_calls: Arc<AtomicUsize>,
1913    }
1914
1915    impl FakeConnector {
1916        fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
1917            Self {
1918                provider_id: ProviderId::from(provider_id),
1919                kinds: vec![TriggerKind::from("webhook")],
1920                activate_calls,
1921            }
1922        }
1923    }
1924
1925    #[async_trait]
1926    impl Connector for FakeConnector {
1927        fn provider_id(&self) -> &ProviderId {
1928            &self.provider_id
1929        }
1930
1931        fn kinds(&self) -> &[TriggerKind] {
1932            &self.kinds
1933        }
1934
1935        async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1936            Ok(())
1937        }
1938
1939        async fn activate(
1940            &self,
1941            bindings: &[TriggerBinding],
1942        ) -> Result<ActivationHandle, ConnectorError> {
1943            self.activate_calls.fetch_add(1, Ordering::SeqCst);
1944            Ok(ActivationHandle::new(
1945                self.provider_id.clone(),
1946                bindings.len(),
1947            ))
1948        }
1949
1950        async fn normalize_inbound(
1951            &self,
1952            _raw: RawInbound,
1953        ) -> Result<TriggerEvent, ConnectorError> {
1954            Err(ConnectorError::Unsupported(
1955                "not needed for registry tests".to_string(),
1956            ))
1957        }
1958
1959        fn payload_schema(&self) -> ProviderPayloadSchema {
1960            ProviderPayloadSchema::named("FakePayload")
1961        }
1962
1963        fn client(&self) -> Arc<dyn ConnectorClient> {
1964            Arc::new(NoopClient)
1965        }
1966    }
1967
1968    #[tokio::test]
1969    async fn connector_registry_rejects_duplicate_providers() {
1970        let activate_calls = Arc::new(AtomicUsize::new(0));
1971        let mut registry = ConnectorRegistry::empty();
1972        registry
1973            .register(Box::new(FakeConnector::new(
1974                "github",
1975                activate_calls.clone(),
1976            )))
1977            .unwrap();
1978
1979        let error = registry
1980            .register(Box::new(FakeConnector::new("github", activate_calls)))
1981            .unwrap_err();
1982        assert!(matches!(
1983            error,
1984            ConnectorError::DuplicateProvider(provider) if provider == "github"
1985        ));
1986    }
1987
1988    #[tokio::test]
1989    async fn connector_registry_activates_only_bound_connectors() {
1990        let github_calls = Arc::new(AtomicUsize::new(0));
1991        let slack_calls = Arc::new(AtomicUsize::new(0));
1992        let mut registry = ConnectorRegistry::empty();
1993        registry
1994            .register(Box::new(FakeConnector::new("github", github_calls.clone())))
1995            .unwrap();
1996        registry
1997            .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
1998            .unwrap();
1999
2000        let mut trigger_registry = TriggerRegistry::default();
2001        trigger_registry.register(TriggerBinding::new(
2002            ProviderId::from("github"),
2003            "webhook",
2004            "github.push",
2005        ));
2006        trigger_registry.register(TriggerBinding::new(
2007            ProviderId::from("github"),
2008            "webhook",
2009            "github.installation",
2010        ));
2011
2012        let handles = registry.activate_all(&trigger_registry).await.unwrap();
2013        assert_eq!(handles.len(), 1);
2014        assert_eq!(handles[0].provider.as_str(), "github");
2015        assert_eq!(handles[0].binding_count, 2);
2016        assert_eq!(github_calls.load(Ordering::SeqCst), 1);
2017        assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
2018    }
2019
2020    #[test]
2021    fn rate_limiter_scopes_tokens_by_provider_and_key() {
2022        let factory = RateLimiterFactory::new(RateLimitConfig {
2023            capacity: 1,
2024            refill_tokens: 1,
2025            refill_interval: StdDuration::from_secs(60),
2026        });
2027
2028        assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
2029        assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
2030        assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
2031        assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
2032    }
2033
2034    #[test]
2035    fn raw_inbound_json_body_preserves_raw_bytes() {
2036        let raw = RawInbound::new(
2037            "push",
2038            BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
2039            br#"{"ok":true}"#.to_vec(),
2040        );
2041
2042        assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
2043    }
2044
2045    #[test]
2046    fn connector_registry_lists_core_catalog_providers() {
2047        let registry = ConnectorRegistry::default();
2048        let providers = registry.list();
2049        assert!(providers.contains(&ProviderId::from("cron")));
2050        assert!(providers.contains(&ProviderId::from("webhook")));
2051        assert!(providers.contains(&ProviderId::from("kafka")));
2052        assert!(!providers.contains(&ProviderId::from("github")));
2053        assert!(!providers.contains(&ProviderId::from("slack")));
2054    }
2055
2056    #[test]
2057    fn pure_harn_pivot_only_keeps_core_builtin_connectors() {
2058        let core_runtime_providers = [
2059            "a2a-push",
2060            "cron",
2061            "email",
2062            "kafka",
2063            "nats",
2064            "postgres-cdc",
2065            "pulsar",
2066            "webhook",
2067            "websocket",
2068        ];
2069
2070        for provider in registered_provider_metadata() {
2071            if !matches!(provider.runtime, ProviderRuntimeMetadata::Builtin { .. }) {
2072                continue;
2073            }
2074
2075            let allowed_core = core_runtime_providers.contains(&provider.provider.as_str());
2076            assert!(
2077                allowed_core,
2078                "provider '{}' is registered as a Rust builtin connector; new service connectors \
2079                 must ship as pure-Harn packages and register with connector = {{ harn = \"...\" }}",
2080                provider.provider
2081            );
2082        }
2083    }
2084
2085    #[test]
2086    fn metrics_registry_exports_orchestrator_metric_families() {
2087        let metrics = MetricsRegistry::default();
2088        metrics.record_http_request(
2089            "/triggers/github",
2090            "POST",
2091            200,
2092            StdDuration::from_millis(25),
2093            512,
2094        );
2095        metrics.record_trigger_received("github-new-issue", "github");
2096        metrics.record_trigger_deduped("github-new-issue", "inbox_duplicate");
2097        metrics.record_trigger_predicate_evaluation("github-new-issue", true, 0.002);
2098        metrics.record_trigger_dispatched("github-new-issue", "local", "succeeded");
2099        metrics.record_trigger_retry("github-new-issue", 2);
2100        metrics.record_trigger_dlq("github-new-issue", "retry_exhausted");
2101        metrics.set_trigger_inflight("github-new-issue", 0);
2102        metrics.record_event_log_append(
2103            "orchestrator.triggers.pending",
2104            StdDuration::from_millis(1),
2105            2048,
2106        );
2107        metrics.set_event_log_consumer_lag("orchestrator.triggers.pending", "orchestrator-pump", 0);
2108        metrics.set_trigger_budget_cost_today("github-new-issue", 0.002);
2109        metrics.record_trigger_budget_exhausted("github-new-issue", "daily_budget_exceeded");
2110        metrics.record_a2a_hop("agent.example", "succeeded", StdDuration::from_millis(10));
2111        metrics.set_worker_queue_depth("triage", 1);
2112        metrics.record_worker_queue_claim_age("triage", 3.0);
2113        metrics.set_orchestrator_pump_backlog("trigger.inbox.envelopes", 2);
2114        metrics.set_orchestrator_pump_outstanding("trigger.inbox.envelopes", 1);
2115        metrics.record_orchestrator_pump_admission_delay(
2116            "trigger.inbox.envelopes",
2117            StdDuration::from_millis(50),
2118        );
2119        metrics.record_trigger_accepted_to_normalized(
2120            "github-new-issue",
2121            "github-new-issue@v7",
2122            "github",
2123            Some("tenant-a"),
2124            "normalized",
2125            StdDuration::from_millis(25),
2126        );
2127        metrics.record_trigger_accepted_to_queue_append(
2128            "github-new-issue",
2129            "github-new-issue@v7",
2130            "github",
2131            Some("tenant-a"),
2132            "queued",
2133            StdDuration::from_millis(40),
2134        );
2135        metrics.record_trigger_queue_age_at_dispatch_admission(
2136            "github-new-issue",
2137            "github-new-issue@v7",
2138            "github",
2139            Some("tenant-a"),
2140            "admitted",
2141            StdDuration::from_millis(75),
2142        );
2143        metrics.record_trigger_queue_age_at_dispatch_start(
2144            "github-new-issue",
2145            "github-new-issue@v7",
2146            "github",
2147            Some("tenant-a"),
2148            "started",
2149            StdDuration::from_millis(125),
2150        );
2151        metrics.record_trigger_dispatch_runtime(
2152            "github-new-issue",
2153            "github-new-issue@v7",
2154            "github",
2155            Some("tenant-a"),
2156            "succeeded",
2157            StdDuration::from_millis(250),
2158        );
2159        metrics.record_trigger_retry_delay(
2160            "github-new-issue",
2161            "github-new-issue@v7",
2162            "github",
2163            Some("tenant-a"),
2164            "scheduled",
2165            StdDuration::from_secs(2),
2166        );
2167        metrics.record_trigger_accepted_to_dlq(
2168            "github-new-issue",
2169            "github-new-issue@v7",
2170            "github",
2171            Some("tenant-a"),
2172            "retry_exhausted",
2173            StdDuration::from_secs(45),
2174        );
2175        metrics.record_backpressure_event("ingest", "reject");
2176        metrics.note_trigger_pending_event(
2177            "evt-1",
2178            "github-new-issue",
2179            "github-new-issue@v7",
2180            "github",
2181            Some("tenant-a"),
2182            1_000,
2183            4_000,
2184        );
2185        metrics.record_llm_call("mock", "mock", "succeeded", 0.01);
2186        metrics.record_llm_cache_hit("mock");
2187
2188        let rendered = metrics.render_prometheus();
2189        for needle in [
2190            "harn_http_requests_total{endpoint=\"/triggers/github\",method=\"POST\",status=\"200\"} 1",
2191            "harn_http_request_duration_seconds_bucket{endpoint=\"/triggers/github\",le=\"0.05\"} 1",
2192            "harn_http_body_size_bytes_bucket{endpoint=\"/triggers/github\",le=\"512\"} 1",
2193            "harn_trigger_received_total{provider=\"github\",trigger_id=\"github-new-issue\"} 1",
2194            "harn_trigger_deduped_total{reason=\"inbox_duplicate\",trigger_id=\"github-new-issue\"} 1",
2195            "harn_trigger_predicate_evaluations_total{result=\"true\",trigger_id=\"github-new-issue\"} 1",
2196            "harn_trigger_predicate_cost_usd_bucket{le=\"0.01\",trigger_id=\"github-new-issue\"} 1",
2197            "harn_trigger_dispatched_total{handler_kind=\"local\",outcome=\"succeeded\",trigger_id=\"github-new-issue\"} 1",
2198            "harn_trigger_retries_total{attempt=\"2\",trigger_id=\"github-new-issue\"} 1",
2199            "harn_trigger_dlq_total{reason=\"retry_exhausted\",trigger_id=\"github-new-issue\"} 1",
2200            "harn_trigger_inflight{trigger_id=\"github-new-issue\"} 0",
2201            "harn_event_log_append_duration_seconds_bucket{le=\"0.005\",topic=\"orchestrator.triggers.pending\"} 1",
2202            "harn_event_log_topic_size_bytes{topic=\"orchestrator.triggers.pending\"} 2048",
2203            "harn_event_log_consumer_lag{consumer=\"orchestrator-pump\",topic=\"orchestrator.triggers.pending\"} 0",
2204            "harn_trigger_budget_cost_today_usd{trigger_id=\"github-new-issue\"} 0.002",
2205            "harn_trigger_budget_exhausted_total{strategy=\"daily_budget_exceeded\",trigger_id=\"github-new-issue\"} 1",
2206            "harn_backpressure_events_total{action=\"reject\",dimension=\"ingest\"} 1",
2207            "harn_a2a_hops_total{outcome=\"succeeded\",target=\"agent.example\"} 1",
2208            "harn_a2a_hop_duration_seconds_bucket{le=\"0.01\",target=\"agent.example\"} 1",
2209            "harn_worker_queue_depth{queue=\"triage\"} 1",
2210            "harn_worker_queue_claim_age_seconds_bucket{le=\"5\",queue=\"triage\"} 1",
2211            "harn_orchestrator_pump_backlog{topic=\"trigger.inbox.envelopes\"} 2",
2212            "harn_orchestrator_pump_outstanding{topic=\"trigger.inbox.envelopes\"} 1",
2213            "harn_orchestrator_pump_admission_delay_seconds_bucket{le=\"0.05\",topic=\"trigger.inbox.envelopes\"} 1",
2214            "harn_trigger_webhook_accepted_to_normalized_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.025\",provider=\"github\",status=\"normalized\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2215            "harn_trigger_webhook_accepted_to_queue_append_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.05\",provider=\"github\",status=\"queued\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2216            "harn_trigger_queue_age_at_dispatch_admission_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.1\",provider=\"github\",status=\"admitted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2217            "harn_trigger_queue_age_at_dispatch_start_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"started\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2218            "harn_trigger_dispatch_runtime_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"succeeded\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2219            "harn_trigger_retry_delay_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"2.5\",provider=\"github\",status=\"scheduled\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2220            "harn_trigger_accepted_to_dlq_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"60\",provider=\"github\",status=\"retry_exhausted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2221            "harn_trigger_oldest_pending_age_seconds{binding_key=\"github-new-issue@v7\",provider=\"github\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 3",
2222            "harn_llm_calls_total{model=\"mock\",outcome=\"succeeded\",provider=\"mock\"} 1",
2223            "harn_llm_cost_usd_total{model=\"mock\",provider=\"mock\"} 0.01",
2224            "harn_llm_cache_hits_total{provider=\"mock\"} 1",
2225        ] {
2226            assert!(rendered.contains(needle), "missing {needle}\n{rendered}");
2227        }
2228    }
2229}