Skip to main content

harn_vm/connectors/
mod.rs

1//! Connector traits and shared helpers for inbound event-source providers.
2//!
3//! This lands in `harn-vm` for now because the current dependency surface
4//! (`EventLog`, `SecretProvider`, `TriggerEvent`) already lives here. If the
5//! connector ecosystem grows enough to justify extraction, the module can be
6//! split into a dedicated crate later without changing the high-level contract.
7
8use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex, OnceLock};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25    registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26    ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod a2a_push;
30pub mod cron;
31pub mod github;
32pub mod harn_module;
33pub mod hmac;
34pub mod linear;
35pub mod notion;
36pub mod slack;
37pub mod stream;
38#[cfg(test)]
39pub(crate) mod test_util;
40pub mod webhook;
41
42pub use a2a_push::A2aPushConnector;
43pub use cron::{CatchupMode, CronConnector};
44pub use github::GitHubConnector;
45pub use harn_module::{
46    load_contract as load_harn_connector_contract, HarnConnector, HarnConnectorContract,
47};
48pub use hmac::{
49    verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
50    DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
51    DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
52    DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
53    DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
54    DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
55    SIGNATURE_VERIFY_AUDIT_TOPIC,
56};
57pub use linear::LinearConnector;
58pub use notion::{
59    load_pending_webhook_handshakes, NotionConnector, PersistedNotionWebhookHandshake,
60};
61pub use slack::SlackConnector;
62pub use stream::StreamConnector;
63use webhook::WebhookProviderProfile;
64pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
65
66/// Shared owned handle to a connector instance registered with the runtime.
67pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
68
69thread_local! {
70    static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
71        RefCell::new(BTreeMap::new());
72}
73
74/// Provider implementation contract for inbound connectors.
75#[async_trait]
76pub trait Connector: Send + Sync {
77    /// Stable provider id such as `github`, `slack`, or `webhook`.
78    fn provider_id(&self) -> &ProviderId;
79
80    /// Trigger kinds this connector supports (`webhook`, `poll`, `stream`, ...).
81    fn kinds(&self) -> &[TriggerKind];
82
83    /// Called once per connector instance at orchestrator startup.
84    async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
85
86    /// Activate the bindings relevant to this connector instance.
87    async fn activate(
88        &self,
89        bindings: &[TriggerBinding],
90    ) -> Result<ActivationHandle, ConnectorError>;
91
92    /// Stop connector-owned background work and flush any connector-local state.
93    async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
94        Ok(())
95    }
96
97    /// Verify + normalize a provider-native inbound request into `TriggerEvent`.
98    async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
99
100    /// Verify + normalize a provider-native inbound request into the richer
101    /// connector result contract used by ack-first webhook adapters.
102    async fn normalize_inbound_result(
103        &self,
104        raw: RawInbound,
105    ) -> Result<ConnectorNormalizeResult, ConnectorError> {
106        self.normalize_inbound(raw)
107            .await
108            .map(ConnectorNormalizeResult::event)
109    }
110
111    /// Payload schema surfaced to future trigger-type narrowing.
112    fn payload_schema(&self) -> ProviderPayloadSchema;
113
114    /// Outbound API wrapper exposed to handlers.
115    fn client(&self) -> Arc<dyn ConnectorClient>;
116}
117
118/// Provider-supplied HTTP response returned before or instead of trigger dispatch.
119#[derive(Clone, Debug, PartialEq)]
120pub struct ConnectorHttpResponse {
121    pub status: u16,
122    pub headers: BTreeMap<String, String>,
123    pub body: JsonValue,
124}
125
126impl ConnectorHttpResponse {
127    pub fn new(status: u16, headers: BTreeMap<String, String>, body: JsonValue) -> Self {
128        Self {
129            status,
130            headers,
131            body,
132        }
133    }
134}
135
136/// Normalized inbound result accepted by the runtime connector adapter.
137#[derive(Clone, Debug, PartialEq)]
138pub enum ConnectorNormalizeResult {
139    Event(Box<TriggerEvent>),
140    Batch(Vec<TriggerEvent>),
141    ImmediateResponse {
142        response: ConnectorHttpResponse,
143        events: Vec<TriggerEvent>,
144    },
145    Reject(ConnectorHttpResponse),
146}
147
148impl ConnectorNormalizeResult {
149    pub fn event(event: TriggerEvent) -> Self {
150        Self::Event(Box::new(event))
151    }
152
153    pub fn into_events(self) -> Vec<TriggerEvent> {
154        match self {
155            Self::Event(event) => vec![*event],
156            Self::Batch(events) | Self::ImmediateResponse { events, .. } => events,
157            Self::Reject(_) => Vec::new(),
158        }
159    }
160}
161
162#[derive(Clone, Debug, PartialEq)]
163pub enum PostNormalizeOutcome {
164    Ready(Box<TriggerEvent>),
165    DuplicateDropped,
166}
167
168pub async fn postprocess_normalized_event(
169    inbox: &InboxIndex,
170    binding_id: &str,
171    dedupe_enabled: bool,
172    dedupe_ttl: StdDuration,
173    mut event: TriggerEvent,
174) -> Result<PostNormalizeOutcome, ConnectorError> {
175    if dedupe_enabled && !event.dedupe_claimed() {
176        if !inbox
177            .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
178            .await?
179        {
180            return Ok(PostNormalizeOutcome::DuplicateDropped);
181        }
182        event.mark_dedupe_claimed();
183    }
184
185    Ok(PostNormalizeOutcome::Ready(Box::new(event)))
186}
187
188/// Outbound provider client interface used by connector-backed stdlib modules.
189#[async_trait]
190pub trait ConnectorClient: Send + Sync {
191    async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
192}
193
194/// Minimal outbound client errors shared by connector implementations.
195#[derive(Clone, Debug, PartialEq, Eq)]
196pub enum ClientError {
197    MethodNotFound(String),
198    InvalidArgs(String),
199    RateLimited(String),
200    Transport(String),
201    Other(String),
202}
203
204impl fmt::Display for ClientError {
205    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
206        match self {
207            Self::MethodNotFound(message)
208            | Self::InvalidArgs(message)
209            | Self::RateLimited(message)
210            | Self::Transport(message)
211            | Self::Other(message) => message.fmt(f),
212        }
213    }
214}
215
216impl std::error::Error for ClientError {}
217
218/// Shared connector-layer errors.
219#[derive(Debug)]
220pub enum ConnectorError {
221    DuplicateProvider(String),
222    DuplicateDelivery(String),
223    UnknownProvider(String),
224    MissingHeader(String),
225    InvalidHeader {
226        name: String,
227        detail: String,
228    },
229    InvalidSignature(String),
230    TimestampOutOfWindow {
231        timestamp: OffsetDateTime,
232        now: OffsetDateTime,
233        window: time::Duration,
234    },
235    Json(String),
236    Secret(String),
237    EventLog(String),
238    HarnRuntime(String),
239    Client(ClientError),
240    Unsupported(String),
241    Activation(String),
242}
243
244impl ConnectorError {
245    pub fn invalid_signature(message: impl Into<String>) -> Self {
246        Self::InvalidSignature(message.into())
247    }
248}
249
250impl fmt::Display for ConnectorError {
251    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252        match self {
253            Self::DuplicateProvider(provider) => {
254                write!(f, "connector provider `{provider}` is already registered")
255            }
256            Self::DuplicateDelivery(message) => message.fmt(f),
257            Self::UnknownProvider(provider) => {
258                write!(f, "connector provider `{provider}` is not registered")
259            }
260            Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
261            Self::InvalidHeader { name, detail } => {
262                write!(f, "invalid header `{name}`: {detail}")
263            }
264            Self::InvalidSignature(message)
265            | Self::Json(message)
266            | Self::Secret(message)
267            | Self::EventLog(message)
268            | Self::HarnRuntime(message)
269            | Self::Unsupported(message)
270            | Self::Activation(message) => message.fmt(f),
271            Self::TimestampOutOfWindow {
272                timestamp,
273                now,
274                window,
275            } => write!(
276                f,
277                "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
278            ),
279            Self::Client(error) => error.fmt(f),
280        }
281    }
282}
283
284impl std::error::Error for ConnectorError {}
285
286impl From<crate::event_log::LogError> for ConnectorError {
287    fn from(value: crate::event_log::LogError) -> Self {
288        Self::EventLog(value.to_string())
289    }
290}
291
292impl From<crate::secrets::SecretError> for ConnectorError {
293    fn from(value: crate::secrets::SecretError) -> Self {
294        Self::Secret(value.to_string())
295    }
296}
297
298impl From<serde_json::Error> for ConnectorError {
299    fn from(value: serde_json::Error) -> Self {
300        Self::Json(value.to_string())
301    }
302}
303
304impl From<ClientError> for ConnectorError {
305    fn from(value: ClientError) -> Self {
306        Self::Client(value)
307    }
308}
309
310/// Startup context shared with connector instances.
311#[derive(Clone)]
312pub struct ConnectorCtx {
313    pub event_log: Arc<AnyEventLog>,
314    pub secrets: Arc<dyn SecretProvider>,
315    pub inbox: Arc<InboxIndex>,
316    pub metrics: Arc<MetricsRegistry>,
317    pub rate_limiter: Arc<RateLimiterFactory>,
318}
319
320/// Snapshot of connector-local metrics surfaced for tests and diagnostics.
321#[derive(Clone, Debug, Default, PartialEq, Eq)]
322pub struct ConnectorMetricsSnapshot {
323    pub inbox_claims_written: u64,
324    pub inbox_duplicates_rejected: u64,
325    pub inbox_fast_path_hits: u64,
326    pub inbox_durable_hits: u64,
327    pub inbox_expired_entries: u64,
328    pub inbox_active_entries: u64,
329    pub linear_timestamp_rejections_total: u64,
330    pub dispatch_succeeded_total: u64,
331    pub dispatch_failed_total: u64,
332    pub retry_scheduled_total: u64,
333    pub slack_delivery_success_total: u64,
334    pub slack_delivery_failure_total: u64,
335}
336
337type MetricLabels = BTreeMap<String, String>;
338
339#[derive(Clone, Debug, Default, PartialEq)]
340struct HistogramMetric {
341    buckets: BTreeMap<String, u64>,
342    count: u64,
343    sum: f64,
344}
345
346static ACTIVE_METRICS_REGISTRY: OnceLock<Mutex<Option<Arc<MetricsRegistry>>>> = OnceLock::new();
347
348pub fn install_active_metrics_registry(metrics: Arc<MetricsRegistry>) {
349    let slot = ACTIVE_METRICS_REGISTRY.get_or_init(|| Mutex::new(None));
350    *slot.lock().expect("active metrics registry poisoned") = Some(metrics);
351}
352
353pub fn clear_active_metrics_registry() {
354    if let Some(slot) = ACTIVE_METRICS_REGISTRY.get() {
355        *slot.lock().expect("active metrics registry poisoned") = None;
356    }
357}
358
359pub fn active_metrics_registry() -> Option<Arc<MetricsRegistry>> {
360    ACTIVE_METRICS_REGISTRY.get().and_then(|slot| {
361        slot.lock()
362            .expect("active metrics registry poisoned")
363            .clone()
364    })
365}
366
367/// Shared metrics surface for connector-local counters and timings.
368#[derive(Debug, Default)]
369pub struct MetricsRegistry {
370    inbox_claims_written: AtomicU64,
371    inbox_duplicates_rejected: AtomicU64,
372    inbox_fast_path_hits: AtomicU64,
373    inbox_durable_hits: AtomicU64,
374    inbox_expired_entries: AtomicU64,
375    inbox_active_entries: AtomicU64,
376    linear_timestamp_rejections_total: AtomicU64,
377    dispatch_succeeded_total: AtomicU64,
378    dispatch_failed_total: AtomicU64,
379    retry_scheduled_total: AtomicU64,
380    slack_delivery_success_total: AtomicU64,
381    slack_delivery_failure_total: AtomicU64,
382    custom_counters: Mutex<BTreeMap<String, u64>>,
383    counters: Mutex<BTreeMap<(String, MetricLabels), f64>>,
384    gauges: Mutex<BTreeMap<(String, MetricLabels), f64>>,
385    histograms: Mutex<BTreeMap<(String, MetricLabels), HistogramMetric>>,
386}
387
388impl MetricsRegistry {
389    const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0];
390    const SIZE_BUCKETS: [f64; 9] = [
391        128.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0, 10485760.0,
392    ];
393
394    pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
395        ConnectorMetricsSnapshot {
396            inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
397            inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
398            inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
399            inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
400            inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
401            inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
402            linear_timestamp_rejections_total: self
403                .linear_timestamp_rejections_total
404                .load(Ordering::Relaxed),
405            dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
406            dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
407            retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
408            slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
409            slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
410        }
411    }
412
413    pub(crate) fn record_inbox_claim(&self) {
414        self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
415    }
416
417    pub(crate) fn record_inbox_duplicate_fast_path(&self) {
418        self.inbox_duplicates_rejected
419            .fetch_add(1, Ordering::Relaxed);
420        self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
421    }
422
423    pub(crate) fn record_inbox_duplicate_durable(&self) {
424        self.inbox_duplicates_rejected
425            .fetch_add(1, Ordering::Relaxed);
426        self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
427    }
428
429    pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
430        if count > 0 {
431            self.inbox_expired_entries
432                .fetch_add(count, Ordering::Relaxed);
433        }
434    }
435
436    pub(crate) fn set_inbox_active_entries(&self, count: usize) {
437        self.inbox_active_entries
438            .store(count as u64, Ordering::Relaxed);
439    }
440
441    pub fn record_linear_timestamp_rejection(&self) {
442        self.linear_timestamp_rejections_total
443            .fetch_add(1, Ordering::Relaxed);
444    }
445
446    pub fn record_dispatch_succeeded(&self) {
447        self.dispatch_succeeded_total
448            .fetch_add(1, Ordering::Relaxed);
449    }
450
451    pub fn record_dispatch_failed(&self) {
452        self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
453    }
454
455    pub fn record_retry_scheduled(&self) {
456        self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
457    }
458
459    pub fn record_slack_delivery_success(&self) {
460        self.slack_delivery_success_total
461            .fetch_add(1, Ordering::Relaxed);
462    }
463
464    pub fn record_slack_delivery_failure(&self) {
465        self.slack_delivery_failure_total
466            .fetch_add(1, Ordering::Relaxed);
467    }
468
469    pub fn record_custom_counter(&self, name: &str, amount: u64) {
470        if amount == 0 {
471            return;
472        }
473        let mut counters = self
474            .custom_counters
475            .lock()
476            .expect("custom counters poisoned");
477        *counters.entry(name.to_string()).or_default() += amount;
478    }
479
480    pub fn record_http_request(
481        &self,
482        endpoint: &str,
483        method: &str,
484        status: u16,
485        duration: StdDuration,
486        body_size_bytes: usize,
487    ) {
488        self.increment_counter(
489            "harn_http_requests_total",
490            labels([
491                ("endpoint", endpoint),
492                ("method", method),
493                ("status", &status.to_string()),
494            ]),
495            1,
496        );
497        self.observe_histogram(
498            "harn_http_request_duration_seconds",
499            labels([("endpoint", endpoint)]),
500            duration.as_secs_f64(),
501            &Self::DURATION_BUCKETS,
502        );
503        self.observe_histogram(
504            "harn_http_body_size_bytes",
505            labels([("endpoint", endpoint)]),
506            body_size_bytes as f64,
507            &Self::SIZE_BUCKETS,
508        );
509    }
510
511    pub fn record_trigger_received(&self, trigger_id: &str, provider: &str) {
512        self.increment_counter(
513            "harn_trigger_received_total",
514            labels([("trigger_id", trigger_id), ("provider", provider)]),
515            1,
516        );
517    }
518
519    pub fn record_trigger_deduped(&self, trigger_id: &str, reason: &str) {
520        self.increment_counter(
521            "harn_trigger_deduped_total",
522            labels([("trigger_id", trigger_id), ("reason", reason)]),
523            1,
524        );
525    }
526
527    pub fn record_trigger_predicate_evaluation(
528        &self,
529        trigger_id: &str,
530        result: bool,
531        cost_usd: f64,
532    ) {
533        self.increment_counter(
534            "harn_trigger_predicate_evaluations_total",
535            labels([
536                ("trigger_id", trigger_id),
537                ("result", if result { "true" } else { "false" }),
538            ]),
539            1,
540        );
541        self.observe_histogram(
542            "harn_trigger_predicate_cost_usd",
543            labels([("trigger_id", trigger_id)]),
544            cost_usd.max(0.0),
545            &[0.0, 0.001, 0.01, 0.05, 0.1, 1.0],
546        );
547    }
548
549    pub fn record_trigger_dispatched(&self, trigger_id: &str, handler_kind: &str, outcome: &str) {
550        self.increment_counter(
551            "harn_trigger_dispatched_total",
552            labels([
553                ("trigger_id", trigger_id),
554                ("handler_kind", handler_kind),
555                ("outcome", outcome),
556            ]),
557            1,
558        );
559    }
560
561    pub fn record_trigger_retry(&self, trigger_id: &str, attempt: u32) {
562        self.increment_counter(
563            "harn_trigger_retries_total",
564            labels([
565                ("trigger_id", trigger_id),
566                ("attempt", &attempt.to_string()),
567            ]),
568            1,
569        );
570    }
571
572    pub fn record_trigger_dlq(&self, trigger_id: &str, reason: &str) {
573        self.increment_counter(
574            "harn_trigger_dlq_total",
575            labels([("trigger_id", trigger_id), ("reason", reason)]),
576            1,
577        );
578    }
579
580    pub fn set_trigger_inflight(&self, trigger_id: &str, count: u64) {
581        self.set_gauge(
582            "harn_trigger_inflight",
583            labels([("trigger_id", trigger_id)]),
584            count as f64,
585        );
586    }
587
588    pub fn set_trigger_budget_cost_today(&self, trigger_id: &str, cost_usd: f64) {
589        self.set_gauge(
590            "harn_trigger_budget_cost_today_usd",
591            labels([("trigger_id", trigger_id)]),
592            cost_usd.max(0.0),
593        );
594    }
595
596    pub fn record_trigger_budget_exhausted(&self, trigger_id: &str, strategy: &str) {
597        self.increment_counter(
598            "harn_trigger_budget_exhausted_total",
599            labels([("trigger_id", trigger_id), ("strategy", strategy)]),
600            1,
601        );
602    }
603
604    pub fn record_event_log_append(
605        &self,
606        topic: &str,
607        duration: StdDuration,
608        payload_bytes: usize,
609    ) {
610        self.observe_histogram(
611            "harn_event_log_append_duration_seconds",
612            labels([("topic", topic)]),
613            duration.as_secs_f64(),
614            &Self::DURATION_BUCKETS,
615        );
616        self.set_gauge(
617            "harn_event_log_topic_size_bytes",
618            labels([("topic", topic)]),
619            payload_bytes as f64,
620        );
621    }
622
623    pub fn set_event_log_consumer_lag(&self, topic: &str, consumer: &str, lag: u64) {
624        self.set_gauge(
625            "harn_event_log_consumer_lag",
626            labels([("topic", topic), ("consumer", consumer)]),
627            lag as f64,
628        );
629    }
630
631    pub fn record_a2a_hop(&self, target: &str, outcome: &str, duration: StdDuration) {
632        self.increment_counter(
633            "harn_a2a_hops_total",
634            labels([("target", target), ("outcome", outcome)]),
635            1,
636        );
637        self.observe_histogram(
638            "harn_a2a_hop_duration_seconds",
639            labels([("target", target)]),
640            duration.as_secs_f64(),
641            &Self::DURATION_BUCKETS,
642        );
643    }
644
645    pub fn set_worker_queue_depth(&self, queue: &str, depth: u64) {
646        self.set_gauge(
647            "harn_worker_queue_depth",
648            labels([("queue", queue)]),
649            depth as f64,
650        );
651    }
652
653    pub fn record_worker_queue_claim_age(&self, queue: &str, age_seconds: f64) {
654        self.observe_histogram(
655            "harn_worker_queue_claim_age_seconds",
656            labels([("queue", queue)]),
657            age_seconds.max(0.0),
658            &Self::DURATION_BUCKETS,
659        );
660    }
661
662    pub fn set_orchestrator_pump_backlog(&self, topic: &str, count: u64) {
663        self.set_gauge(
664            "harn_orchestrator_pump_backlog",
665            labels([("topic", topic)]),
666            count as f64,
667        );
668    }
669
670    pub fn set_orchestrator_pump_outstanding(&self, topic: &str, count: usize) {
671        self.set_gauge(
672            "harn_orchestrator_pump_outstanding",
673            labels([("topic", topic)]),
674            count as f64,
675        );
676    }
677
678    pub fn record_orchestrator_pump_admission_delay(&self, topic: &str, duration: StdDuration) {
679        self.observe_histogram(
680            "harn_orchestrator_pump_admission_delay_seconds",
681            labels([("topic", topic)]),
682            duration.as_secs_f64(),
683            &Self::DURATION_BUCKETS,
684        );
685    }
686
687    pub fn record_llm_call(&self, provider: &str, model: &str, outcome: &str, cost_usd: f64) {
688        self.increment_counter(
689            "harn_llm_calls_total",
690            labels([
691                ("provider", provider),
692                ("model", model),
693                ("outcome", outcome),
694            ]),
695            1,
696        );
697        if cost_usd > 0.0 {
698            self.increment_counter(
699                "harn_llm_cost_usd_total",
700                labels([("provider", provider), ("model", model)]),
701                cost_usd,
702            );
703        } else {
704            self.ensure_counter(
705                "harn_llm_cost_usd_total",
706                labels([("provider", provider), ("model", model)]),
707            );
708        }
709    }
710
711    pub fn record_llm_cache_hit(&self, provider: &str) {
712        self.increment_counter(
713            "harn_llm_cache_hits_total",
714            labels([("provider", provider)]),
715            1,
716        );
717    }
718
719    pub fn render_prometheus(&self) -> String {
720        let snapshot = self.snapshot();
721        let counters = [
722            (
723                "connector_linear_timestamp_rejections_total",
724                snapshot.linear_timestamp_rejections_total,
725            ),
726            (
727                "dispatch_succeeded_total",
728                snapshot.dispatch_succeeded_total,
729            ),
730            ("dispatch_failed_total", snapshot.dispatch_failed_total),
731            ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
732            ("retry_scheduled_total", snapshot.retry_scheduled_total),
733            (
734                "slack_events_delivery_success_total",
735                snapshot.slack_delivery_success_total,
736            ),
737            (
738                "slack_events_delivery_failure_total",
739                snapshot.slack_delivery_failure_total,
740            ),
741        ];
742
743        let mut rendered = String::new();
744        for (name, value) in counters {
745            rendered.push_str("# TYPE ");
746            rendered.push_str(name);
747            rendered.push_str(" counter\n");
748            rendered.push_str(name);
749            rendered.push(' ');
750            rendered.push_str(&value.to_string());
751            rendered.push('\n');
752        }
753        let custom_counters = self
754            .custom_counters
755            .lock()
756            .expect("custom counters poisoned");
757        for (name, value) in custom_counters.iter() {
758            let metric_name = format!(
759                "connector_custom_{}_total",
760                name.chars()
761                    .map(|ch| if ch.is_ascii_alphanumeric() || ch == '_' {
762                        ch
763                    } else {
764                        '_'
765                    })
766                    .collect::<String>()
767            );
768            rendered.push_str("# TYPE ");
769            rendered.push_str(&metric_name);
770            rendered.push_str(" counter\n");
771            rendered.push_str(&metric_name);
772            rendered.push(' ');
773            rendered.push_str(&value.to_string());
774            rendered.push('\n');
775        }
776        rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
777        rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
778        rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
779        rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
780        self.render_generic_metrics(&mut rendered);
781        rendered
782    }
783
784    fn increment_counter(&self, name: &str, labels: MetricLabels, amount: impl Into<f64>) {
785        let amount = amount.into();
786        if amount <= 0.0 || !amount.is_finite() {
787            return;
788        }
789        let mut counters = self.counters.lock().expect("metrics counters poisoned");
790        *counters.entry((name.to_string(), labels)).or_default() += amount;
791    }
792
793    fn ensure_counter(&self, name: &str, labels: MetricLabels) {
794        let mut counters = self.counters.lock().expect("metrics counters poisoned");
795        counters.entry((name.to_string(), labels)).or_default();
796    }
797
798    fn set_gauge(&self, name: &str, labels: MetricLabels, value: f64) {
799        let mut gauges = self.gauges.lock().expect("metrics gauges poisoned");
800        gauges.insert((name.to_string(), labels), value);
801    }
802
803    fn observe_histogram(
804        &self,
805        name: &str,
806        labels: MetricLabels,
807        value: f64,
808        bucket_bounds: &[f64],
809    ) {
810        if !value.is_finite() {
811            return;
812        }
813        let mut histograms = self.histograms.lock().expect("metrics histograms poisoned");
814        let histogram = histograms
815            .entry((name.to_string(), labels))
816            .or_insert_with(|| HistogramMetric {
817                buckets: bucket_bounds
818                    .iter()
819                    .map(|bound| (prometheus_float(*bound), 0))
820                    .chain(std::iter::once(("+Inf".to_string(), 0)))
821                    .collect(),
822                count: 0,
823                sum: 0.0,
824            });
825        histogram.count += 1;
826        histogram.sum += value;
827        for bound in bucket_bounds {
828            if value <= *bound {
829                let key = prometheus_float(*bound);
830                *histogram.buckets.entry(key).or_default() += 1;
831            }
832        }
833        *histogram.buckets.entry("+Inf".to_string()).or_default() += 1;
834    }
835
836    fn render_generic_metrics(&self, rendered: &mut String) {
837        let counters = self
838            .counters
839            .lock()
840            .expect("metrics counters poisoned")
841            .clone();
842        let gauges = self.gauges.lock().expect("metrics gauges poisoned").clone();
843        let histograms = self
844            .histograms
845            .lock()
846            .expect("metrics histograms poisoned")
847            .clone();
848
849        for name in metric_family_names(MetricKind::Counter) {
850            rendered.push_str("# TYPE ");
851            rendered.push_str(name);
852            rendered.push_str(" counter\n");
853            for ((sample_name, labels), value) in counters.iter().filter(|((n, _), _)| n == name) {
854                render_sample(rendered, sample_name, labels, *value);
855            }
856        }
857        for name in metric_family_names(MetricKind::Gauge) {
858            rendered.push_str("# TYPE ");
859            rendered.push_str(name);
860            rendered.push_str(" gauge\n");
861            for ((sample_name, labels), value) in gauges.iter().filter(|((n, _), _)| n == name) {
862                render_sample(rendered, sample_name, labels, *value);
863            }
864        }
865        for name in metric_family_names(MetricKind::Histogram) {
866            rendered.push_str("# TYPE ");
867            rendered.push_str(name);
868            rendered.push_str(" histogram\n");
869            for ((sample_name, labels), histogram) in
870                histograms.iter().filter(|((n, _), _)| n == name)
871            {
872                for (le, value) in &histogram.buckets {
873                    let mut bucket_labels = labels.clone();
874                    bucket_labels.insert("le".to_string(), le.clone());
875                    render_sample(
876                        rendered,
877                        &format!("{sample_name}_bucket"),
878                        &bucket_labels,
879                        *value as f64,
880                    );
881                }
882                render_sample(
883                    rendered,
884                    &format!("{sample_name}_sum"),
885                    labels,
886                    histogram.sum,
887                );
888                render_sample(
889                    rendered,
890                    &format!("{sample_name}_count"),
891                    labels,
892                    histogram.count as f64,
893                );
894            }
895        }
896    }
897}
898
899#[derive(Clone, Copy)]
900enum MetricKind {
901    Counter,
902    Gauge,
903    Histogram,
904}
905
906fn metric_family_names(kind: MetricKind) -> &'static [&'static str] {
907    match kind {
908        MetricKind::Counter => &[
909            "harn_http_requests_total",
910            "harn_trigger_received_total",
911            "harn_trigger_deduped_total",
912            "harn_trigger_predicate_evaluations_total",
913            "harn_trigger_dispatched_total",
914            "harn_trigger_retries_total",
915            "harn_trigger_dlq_total",
916            "harn_trigger_budget_exhausted_total",
917            "harn_a2a_hops_total",
918            "harn_llm_calls_total",
919            "harn_llm_cost_usd_total",
920            "harn_llm_cache_hits_total",
921        ],
922        MetricKind::Gauge => &[
923            "harn_trigger_inflight",
924            "harn_event_log_topic_size_bytes",
925            "harn_event_log_consumer_lag",
926            "harn_trigger_budget_cost_today_usd",
927            "harn_worker_queue_depth",
928            "harn_orchestrator_pump_backlog",
929            "harn_orchestrator_pump_outstanding",
930        ],
931        MetricKind::Histogram => &[
932            "harn_http_request_duration_seconds",
933            "harn_http_body_size_bytes",
934            "harn_trigger_predicate_cost_usd",
935            "harn_event_log_append_duration_seconds",
936            "harn_a2a_hop_duration_seconds",
937            "harn_worker_queue_claim_age_seconds",
938            "harn_orchestrator_pump_admission_delay_seconds",
939        ],
940    }
941}
942
943fn labels<const N: usize>(pairs: [(&str, &str); N]) -> MetricLabels {
944    pairs
945        .into_iter()
946        .map(|(name, value)| (name.to_string(), value.to_string()))
947        .collect()
948}
949
950fn render_sample(rendered: &mut String, name: &str, labels: &MetricLabels, value: f64) {
951    rendered.push_str(name);
952    if !labels.is_empty() {
953        rendered.push('{');
954        for (index, (label, label_value)) in labels.iter().enumerate() {
955            if index > 0 {
956                rendered.push(',');
957            }
958            rendered.push_str(label);
959            rendered.push_str("=\"");
960            rendered.push_str(&escape_label_value(label_value));
961            rendered.push('"');
962        }
963        rendered.push('}');
964    }
965    rendered.push(' ');
966    rendered.push_str(&prometheus_float(value));
967    rendered.push('\n');
968}
969
970fn escape_label_value(value: &str) -> String {
971    value
972        .chars()
973        .flat_map(|ch| match ch {
974            '\\' => "\\\\".chars().collect::<Vec<_>>(),
975            '"' => "\\\"".chars().collect::<Vec<_>>(),
976            '\n' => "\\n".chars().collect::<Vec<_>>(),
977            other => vec![other],
978        })
979        .collect()
980}
981
982fn prometheus_float(value: f64) -> String {
983    if value.is_infinite() && value.is_sign_positive() {
984        return "+Inf".to_string();
985    }
986    if value.fract() == 0.0 {
987        format!("{value:.0}")
988    } else {
989        let rendered = format!("{value:.6}");
990        rendered
991            .trim_end_matches('0')
992            .trim_end_matches('.')
993            .to_string()
994    }
995}
996
997/// Provider payload schema metadata exposed by a connector.
998#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
999pub struct ProviderPayloadSchema {
1000    pub harn_schema_name: String,
1001    #[serde(default)]
1002    pub json_schema: JsonValue,
1003}
1004
1005impl ProviderPayloadSchema {
1006    pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
1007        Self {
1008            harn_schema_name: harn_schema_name.into(),
1009            json_schema,
1010        }
1011    }
1012
1013    pub fn named(harn_schema_name: impl Into<String>) -> Self {
1014        Self::new(harn_schema_name, JsonValue::Null)
1015    }
1016}
1017
1018impl Default for ProviderPayloadSchema {
1019    fn default() -> Self {
1020        Self::named("raw")
1021    }
1022}
1023
1024/// High-level transport kind a connector supports.
1025#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1026#[serde(transparent)]
1027pub struct TriggerKind(String);
1028
1029impl TriggerKind {
1030    pub fn new(value: impl Into<String>) -> Self {
1031        Self(value.into())
1032    }
1033
1034    pub fn as_str(&self) -> &str {
1035        self.0.as_str()
1036    }
1037}
1038
1039impl From<&str> for TriggerKind {
1040    fn from(value: &str) -> Self {
1041        Self::new(value)
1042    }
1043}
1044
1045impl From<String> for TriggerKind {
1046    fn from(value: String) -> Self {
1047        Self::new(value)
1048    }
1049}
1050
1051/// Future trigger manifest binding routed to a connector activation.
1052#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1053pub struct TriggerBinding {
1054    pub provider: ProviderId,
1055    pub kind: TriggerKind,
1056    pub binding_id: String,
1057    #[serde(default)]
1058    pub dedupe_key: Option<String>,
1059    #[serde(default = "default_dedupe_retention_days")]
1060    pub dedupe_retention_days: u32,
1061    #[serde(default)]
1062    pub config: JsonValue,
1063}
1064
1065impl TriggerBinding {
1066    pub fn new(
1067        provider: ProviderId,
1068        kind: impl Into<TriggerKind>,
1069        binding_id: impl Into<String>,
1070    ) -> Self {
1071        Self {
1072            provider,
1073            kind: kind.into(),
1074            binding_id: binding_id.into(),
1075            dedupe_key: None,
1076            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1077            config: JsonValue::Null,
1078        }
1079    }
1080}
1081
1082fn default_dedupe_retention_days() -> u32 {
1083    crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
1084}
1085
1086/// Small in-memory trigger-binding registry used to fan bindings into connectors.
1087#[derive(Clone, Debug, Default)]
1088pub struct TriggerRegistry {
1089    bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
1090}
1091
1092impl TriggerRegistry {
1093    pub fn register(&mut self, binding: TriggerBinding) {
1094        self.bindings
1095            .entry(binding.provider.clone())
1096            .or_default()
1097            .push(binding);
1098    }
1099
1100    pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
1101        &self.bindings
1102    }
1103
1104    pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
1105        self.bindings
1106            .get(provider)
1107            .map(Vec::as_slice)
1108            .unwrap_or(&[])
1109    }
1110}
1111
1112/// Metadata returned from connector activation.
1113#[derive(Clone, Debug, PartialEq, Eq)]
1114pub struct ActivationHandle {
1115    pub provider: ProviderId,
1116    pub binding_count: usize,
1117}
1118
1119impl ActivationHandle {
1120    pub fn new(provider: ProviderId, binding_count: usize) -> Self {
1121        Self {
1122            provider,
1123            binding_count,
1124        }
1125    }
1126}
1127
1128/// Provider-native inbound request payload preserved as raw bytes.
1129#[derive(Clone, Debug, PartialEq)]
1130pub struct RawInbound {
1131    pub kind: String,
1132    pub headers: BTreeMap<String, String>,
1133    pub query: BTreeMap<String, String>,
1134    pub body: Vec<u8>,
1135    pub received_at: OffsetDateTime,
1136    pub occurred_at: Option<OffsetDateTime>,
1137    pub tenant_id: Option<TenantId>,
1138    pub metadata: JsonValue,
1139}
1140
1141impl RawInbound {
1142    pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
1143        Self {
1144            kind: kind.into(),
1145            headers,
1146            query: BTreeMap::new(),
1147            body,
1148            received_at: clock::now_utc(),
1149            occurred_at: None,
1150            tenant_id: None,
1151            metadata: JsonValue::Null,
1152        }
1153    }
1154
1155    pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
1156        Ok(serde_json::from_slice(&self.body)?)
1157    }
1158}
1159
1160/// Token-bucket configuration shared across connector clients.
1161#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1162pub struct RateLimitConfig {
1163    pub capacity: u32,
1164    pub refill_tokens: u32,
1165    pub refill_interval: StdDuration,
1166}
1167
1168impl Default for RateLimitConfig {
1169    fn default() -> Self {
1170        Self {
1171            capacity: 60,
1172            refill_tokens: 1,
1173            refill_interval: StdDuration::from_secs(1),
1174        }
1175    }
1176}
1177
1178#[derive(Clone, Debug)]
1179struct TokenBucket {
1180    tokens: f64,
1181    last_refill: ClockInstant,
1182}
1183
1184impl TokenBucket {
1185    fn full(config: RateLimitConfig) -> Self {
1186        Self {
1187            tokens: config.capacity as f64,
1188            last_refill: clock::instant_now(),
1189        }
1190    }
1191
1192    fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
1193        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1194        let rate = config.refill_tokens.max(1) as f64 / interval;
1195        let elapsed = now.duration_since(self.last_refill).as_secs_f64();
1196        self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
1197        self.last_refill = now;
1198    }
1199
1200    fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
1201        self.refill(config, now);
1202        if self.tokens >= 1.0 {
1203            self.tokens -= 1.0;
1204            true
1205        } else {
1206            false
1207        }
1208    }
1209
1210    fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
1211        if self.tokens >= 1.0 {
1212            return StdDuration::ZERO;
1213        }
1214        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1215        let rate = config.refill_tokens.max(1) as f64 / interval;
1216        let missing = (1.0 - self.tokens).max(0.0);
1217        StdDuration::from_secs_f64((missing / rate).max(0.001))
1218    }
1219}
1220
1221/// Shared per-provider, per-key token bucket factory for outbound connector clients.
1222#[derive(Debug)]
1223pub struct RateLimiterFactory {
1224    config: RateLimitConfig,
1225    buckets: Mutex<HashMap<(String, String), TokenBucket>>,
1226}
1227
1228impl RateLimiterFactory {
1229    pub fn new(config: RateLimitConfig) -> Self {
1230        Self {
1231            config,
1232            buckets: Mutex::new(HashMap::new()),
1233        }
1234    }
1235
1236    pub fn config(&self) -> RateLimitConfig {
1237        self.config
1238    }
1239
1240    pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
1241        ScopedRateLimiter {
1242            factory: self,
1243            provider: provider.clone(),
1244            key: key.into(),
1245        }
1246    }
1247
1248    pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
1249        let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1250        let bucket = buckets
1251            .entry((provider.as_str().to_string(), key.to_string()))
1252            .or_insert_with(|| TokenBucket::full(self.config));
1253        bucket.try_acquire(self.config, clock::instant_now())
1254    }
1255
1256    pub async fn acquire(&self, provider: &ProviderId, key: &str) {
1257        loop {
1258            let wait = {
1259                let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1260                let bucket = buckets
1261                    .entry((provider.as_str().to_string(), key.to_string()))
1262                    .or_insert_with(|| TokenBucket::full(self.config));
1263                if bucket.try_acquire(self.config, clock::instant_now()) {
1264                    return;
1265                }
1266                bucket.wait_duration(self.config)
1267            };
1268            tokio::time::sleep(wait).await;
1269        }
1270    }
1271}
1272
1273impl Default for RateLimiterFactory {
1274    fn default() -> Self {
1275        Self::new(RateLimitConfig::default())
1276    }
1277}
1278
1279/// Borrowed view onto a single provider/key rate-limit scope.
1280#[derive(Clone, Debug)]
1281pub struct ScopedRateLimiter<'a> {
1282    factory: &'a RateLimiterFactory,
1283    provider: ProviderId,
1284    key: String,
1285}
1286
1287impl<'a> ScopedRateLimiter<'a> {
1288    pub fn try_acquire(&self) -> bool {
1289        self.factory.try_acquire(&self.provider, &self.key)
1290    }
1291
1292    pub async fn acquire(&self) {
1293        self.factory.acquire(&self.provider, &self.key).await;
1294    }
1295}
1296
1297/// Runtime connector registry keyed by provider id.
1298pub struct ConnectorRegistry {
1299    connectors: BTreeMap<ProviderId, ConnectorHandle>,
1300}
1301
1302impl ConnectorRegistry {
1303    pub fn empty() -> Self {
1304        Self {
1305            connectors: BTreeMap::new(),
1306        }
1307    }
1308
1309    pub fn with_defaults() -> Self {
1310        let mut registry = Self::empty();
1311        for provider in registered_provider_metadata() {
1312            registry
1313                .register(default_connector_for_provider(&provider))
1314                .expect("default connector registration should not fail");
1315        }
1316        registry
1317    }
1318
1319    pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
1320        let provider = connector.provider_id().clone();
1321        if self.connectors.contains_key(&provider) {
1322            return Err(ConnectorError::DuplicateProvider(provider.0));
1323        }
1324        self.connectors
1325            .insert(provider, Arc::new(AsyncMutex::new(connector)));
1326        Ok(())
1327    }
1328
1329    pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
1330        self.connectors.get(id).cloned()
1331    }
1332
1333    pub fn remove(&mut self, id: &ProviderId) -> Option<ConnectorHandle> {
1334        self.connectors.remove(id)
1335    }
1336
1337    pub fn list(&self) -> Vec<ProviderId> {
1338        self.connectors.keys().cloned().collect()
1339    }
1340
1341    pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1342        for connector in self.connectors.values() {
1343            connector.lock().await.init(ctx.clone()).await?;
1344        }
1345        Ok(())
1346    }
1347
1348    pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
1349        let mut clients = BTreeMap::new();
1350        for (provider, connector) in &self.connectors {
1351            let client = connector.lock().await.client();
1352            clients.insert(provider.clone(), client);
1353        }
1354        clients
1355    }
1356
1357    pub async fn activate_all(
1358        &self,
1359        registry: &TriggerRegistry,
1360    ) -> Result<Vec<ActivationHandle>, ConnectorError> {
1361        let mut handles = Vec::new();
1362        for (provider, connector) in &self.connectors {
1363            let bindings = registry.bindings_for(provider);
1364            if bindings.is_empty() {
1365                continue;
1366            }
1367            let connector = connector.lock().await;
1368            handles.push(connector.activate(bindings).await?);
1369        }
1370        Ok(handles)
1371    }
1372}
1373
1374impl Default for ConnectorRegistry {
1375    fn default() -> Self {
1376        Self::with_defaults()
1377    }
1378}
1379
1380fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
1381    // The provider catalog on main registers `github` with
1382    // ProviderRuntimeMetadata::Builtin { connector: "webhook", ... } so that
1383    // before a native connector existed the catalog auto-wired a
1384    // GenericWebhookConnector. Now that #170 lands a first-class
1385    // GitHubConnector (inbound HMAC + GitHub App outbound), we short-circuit
1386    // provider_id "github" here and return the native connector instead of a
1387    // webhook-backed fallback. This keeps manifests that say
1388    // `provider = "github"` pointed at the new connector without requiring
1389    // users to switch to a distinct provider_id.
1390    if provider.provider == "github" {
1391        return Box::new(GitHubConnector::new());
1392    }
1393    if provider.provider == "linear" {
1394        return Box::new(LinearConnector::new());
1395    }
1396    if provider.provider == "slack" {
1397        return Box::new(SlackConnector::new());
1398    }
1399    if provider.provider == "notion" {
1400        return Box::new(NotionConnector::new());
1401    }
1402    if provider.provider == "a2a-push" {
1403        return Box::new(A2aPushConnector::new());
1404    }
1405    match &provider.runtime {
1406        ProviderRuntimeMetadata::Builtin {
1407            connector,
1408            default_signature_variant,
1409        } => match connector.as_str() {
1410            "cron" => Box::new(CronConnector::new()),
1411            "stream" => Box::new(StreamConnector::new(
1412                ProviderId::from(provider.provider.clone()),
1413                provider.schema_name.clone(),
1414            )),
1415            "webhook" => {
1416                let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
1417                    .expect("catalog webhook signature variant must be valid");
1418                Box::new(GenericWebhookConnector::with_profile(
1419                    WebhookProviderProfile::new(
1420                        ProviderId::from(provider.provider.clone()),
1421                        provider.schema_name.clone(),
1422                        variant,
1423                    ),
1424                ))
1425            }
1426            _ => Box::new(PlaceholderConnector::from_metadata(provider)),
1427        },
1428        ProviderRuntimeMetadata::Placeholder => {
1429            Box::new(PlaceholderConnector::from_metadata(provider))
1430        }
1431    }
1432}
1433
1434struct PlaceholderConnector {
1435    provider_id: ProviderId,
1436    kinds: Vec<TriggerKind>,
1437    schema_name: String,
1438}
1439
1440impl PlaceholderConnector {
1441    fn from_metadata(metadata: &ProviderMetadata) -> Self {
1442        Self {
1443            provider_id: ProviderId::from(metadata.provider.clone()),
1444            kinds: metadata
1445                .kinds
1446                .iter()
1447                .cloned()
1448                .map(TriggerKind::from)
1449                .collect(),
1450            schema_name: metadata.schema_name.clone(),
1451        }
1452    }
1453}
1454
1455struct PlaceholderClient;
1456
1457#[async_trait]
1458impl ConnectorClient for PlaceholderClient {
1459    async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1460        Err(ClientError::Other(format!(
1461            "connector client method '{method}' is not implemented for this provider"
1462        )))
1463    }
1464}
1465
1466#[async_trait]
1467impl Connector for PlaceholderConnector {
1468    fn provider_id(&self) -> &ProviderId {
1469        &self.provider_id
1470    }
1471
1472    fn kinds(&self) -> &[TriggerKind] {
1473        &self.kinds
1474    }
1475
1476    async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1477        Ok(())
1478    }
1479
1480    async fn activate(
1481        &self,
1482        bindings: &[TriggerBinding],
1483    ) -> Result<ActivationHandle, ConnectorError> {
1484        Ok(ActivationHandle::new(
1485            self.provider_id.clone(),
1486            bindings.len(),
1487        ))
1488    }
1489
1490    async fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
1491        Err(ConnectorError::Unsupported(format!(
1492            "provider '{}' is cataloged but does not have a concrete inbound connector yet",
1493            self.provider_id.as_str()
1494        )))
1495    }
1496
1497    fn payload_schema(&self) -> ProviderPayloadSchema {
1498        ProviderPayloadSchema::named(self.schema_name.clone())
1499    }
1500
1501    fn client(&self) -> Arc<dyn ConnectorClient> {
1502        Arc::new(PlaceholderClient)
1503    }
1504}
1505
1506pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
1507    ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
1508        *slot.borrow_mut() = clients
1509            .into_iter()
1510            .map(|(provider, client)| (provider.as_str().to_string(), client))
1511            .collect();
1512    });
1513}
1514
1515pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
1516    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
1517}
1518
1519pub fn clear_active_connector_clients() {
1520    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
1521}
1522
1523#[cfg(test)]
1524mod tests {
1525    use super::*;
1526
1527    use std::sync::atomic::{AtomicUsize, Ordering};
1528
1529    use async_trait::async_trait;
1530    use serde_json::json;
1531
1532    struct NoopClient;
1533
1534    #[async_trait]
1535    impl ConnectorClient for NoopClient {
1536        async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1537            Ok(json!({ "method": method }))
1538        }
1539    }
1540
1541    struct FakeConnector {
1542        provider_id: ProviderId,
1543        kinds: Vec<TriggerKind>,
1544        activate_calls: Arc<AtomicUsize>,
1545    }
1546
1547    impl FakeConnector {
1548        fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
1549            Self {
1550                provider_id: ProviderId::from(provider_id),
1551                kinds: vec![TriggerKind::from("webhook")],
1552                activate_calls,
1553            }
1554        }
1555    }
1556
1557    #[async_trait]
1558    impl Connector for FakeConnector {
1559        fn provider_id(&self) -> &ProviderId {
1560            &self.provider_id
1561        }
1562
1563        fn kinds(&self) -> &[TriggerKind] {
1564            &self.kinds
1565        }
1566
1567        async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1568            Ok(())
1569        }
1570
1571        async fn activate(
1572            &self,
1573            bindings: &[TriggerBinding],
1574        ) -> Result<ActivationHandle, ConnectorError> {
1575            self.activate_calls.fetch_add(1, Ordering::SeqCst);
1576            Ok(ActivationHandle::new(
1577                self.provider_id.clone(),
1578                bindings.len(),
1579            ))
1580        }
1581
1582        async fn normalize_inbound(
1583            &self,
1584            _raw: RawInbound,
1585        ) -> Result<TriggerEvent, ConnectorError> {
1586            Err(ConnectorError::Unsupported(
1587                "not needed for registry tests".to_string(),
1588            ))
1589        }
1590
1591        fn payload_schema(&self) -> ProviderPayloadSchema {
1592            ProviderPayloadSchema::named("FakePayload")
1593        }
1594
1595        fn client(&self) -> Arc<dyn ConnectorClient> {
1596            Arc::new(NoopClient)
1597        }
1598    }
1599
1600    #[tokio::test]
1601    async fn connector_registry_rejects_duplicate_providers() {
1602        let activate_calls = Arc::new(AtomicUsize::new(0));
1603        let mut registry = ConnectorRegistry::empty();
1604        registry
1605            .register(Box::new(FakeConnector::new(
1606                "github",
1607                activate_calls.clone(),
1608            )))
1609            .unwrap();
1610
1611        let error = registry
1612            .register(Box::new(FakeConnector::new("github", activate_calls)))
1613            .unwrap_err();
1614        assert!(matches!(
1615            error,
1616            ConnectorError::DuplicateProvider(provider) if provider == "github"
1617        ));
1618    }
1619
1620    #[tokio::test]
1621    async fn connector_registry_activates_only_bound_connectors() {
1622        let github_calls = Arc::new(AtomicUsize::new(0));
1623        let slack_calls = Arc::new(AtomicUsize::new(0));
1624        let mut registry = ConnectorRegistry::empty();
1625        registry
1626            .register(Box::new(FakeConnector::new("github", github_calls.clone())))
1627            .unwrap();
1628        registry
1629            .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
1630            .unwrap();
1631
1632        let mut trigger_registry = TriggerRegistry::default();
1633        trigger_registry.register(TriggerBinding::new(
1634            ProviderId::from("github"),
1635            "webhook",
1636            "github.push",
1637        ));
1638        trigger_registry.register(TriggerBinding::new(
1639            ProviderId::from("github"),
1640            "webhook",
1641            "github.installation",
1642        ));
1643
1644        let handles = registry.activate_all(&trigger_registry).await.unwrap();
1645        assert_eq!(handles.len(), 1);
1646        assert_eq!(handles[0].provider.as_str(), "github");
1647        assert_eq!(handles[0].binding_count, 2);
1648        assert_eq!(github_calls.load(Ordering::SeqCst), 1);
1649        assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
1650    }
1651
1652    #[test]
1653    fn rate_limiter_scopes_tokens_by_provider_and_key() {
1654        let factory = RateLimiterFactory::new(RateLimitConfig {
1655            capacity: 1,
1656            refill_tokens: 1,
1657            refill_interval: StdDuration::from_secs(60),
1658        });
1659
1660        assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
1661        assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
1662        assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
1663        assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
1664    }
1665
1666    #[test]
1667    fn raw_inbound_json_body_preserves_raw_bytes() {
1668        let raw = RawInbound::new(
1669            "push",
1670            BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
1671            br#"{"ok":true}"#.to_vec(),
1672        );
1673
1674        assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
1675    }
1676
1677    #[test]
1678    fn connector_registry_lists_catalog_providers() {
1679        let registry = ConnectorRegistry::default();
1680        let providers = registry.list();
1681        assert!(providers.contains(&ProviderId::from("cron")));
1682        assert!(providers.contains(&ProviderId::from("github")));
1683        assert!(providers.contains(&ProviderId::from("webhook")));
1684    }
1685
1686    #[test]
1687    fn metrics_registry_exports_orchestrator_metric_families() {
1688        let metrics = MetricsRegistry::default();
1689        metrics.record_http_request(
1690            "/triggers/github",
1691            "POST",
1692            200,
1693            StdDuration::from_millis(25),
1694            512,
1695        );
1696        metrics.record_trigger_received("github-new-issue", "github");
1697        metrics.record_trigger_deduped("github-new-issue", "inbox_duplicate");
1698        metrics.record_trigger_predicate_evaluation("github-new-issue", true, 0.002);
1699        metrics.record_trigger_dispatched("github-new-issue", "local", "succeeded");
1700        metrics.record_trigger_retry("github-new-issue", 2);
1701        metrics.record_trigger_dlq("github-new-issue", "retry_exhausted");
1702        metrics.set_trigger_inflight("github-new-issue", 0);
1703        metrics.record_event_log_append(
1704            "orchestrator.triggers.pending",
1705            StdDuration::from_millis(1),
1706            2048,
1707        );
1708        metrics.set_event_log_consumer_lag("orchestrator.triggers.pending", "orchestrator-pump", 0);
1709        metrics.set_trigger_budget_cost_today("github-new-issue", 0.002);
1710        metrics.record_trigger_budget_exhausted("github-new-issue", "daily_budget_exceeded");
1711        metrics.record_a2a_hop("agent.example", "succeeded", StdDuration::from_millis(10));
1712        metrics.set_worker_queue_depth("triage", 1);
1713        metrics.record_worker_queue_claim_age("triage", 3.0);
1714        metrics.set_orchestrator_pump_backlog("trigger.inbox.envelopes", 2);
1715        metrics.set_orchestrator_pump_outstanding("trigger.inbox.envelopes", 1);
1716        metrics.record_orchestrator_pump_admission_delay(
1717            "trigger.inbox.envelopes",
1718            StdDuration::from_millis(50),
1719        );
1720        metrics.record_llm_call("mock", "mock", "succeeded", 0.01);
1721        metrics.record_llm_cache_hit("mock");
1722
1723        let rendered = metrics.render_prometheus();
1724        for needle in [
1725            "harn_http_requests_total{endpoint=\"/triggers/github\",method=\"POST\",status=\"200\"} 1",
1726            "harn_http_request_duration_seconds_bucket{endpoint=\"/triggers/github\",le=\"0.05\"} 1",
1727            "harn_http_body_size_bytes_bucket{endpoint=\"/triggers/github\",le=\"512\"} 1",
1728            "harn_trigger_received_total{provider=\"github\",trigger_id=\"github-new-issue\"} 1",
1729            "harn_trigger_deduped_total{reason=\"inbox_duplicate\",trigger_id=\"github-new-issue\"} 1",
1730            "harn_trigger_predicate_evaluations_total{result=\"true\",trigger_id=\"github-new-issue\"} 1",
1731            "harn_trigger_predicate_cost_usd_bucket{le=\"0.01\",trigger_id=\"github-new-issue\"} 1",
1732            "harn_trigger_dispatched_total{handler_kind=\"local\",outcome=\"succeeded\",trigger_id=\"github-new-issue\"} 1",
1733            "harn_trigger_retries_total{attempt=\"2\",trigger_id=\"github-new-issue\"} 1",
1734            "harn_trigger_dlq_total{reason=\"retry_exhausted\",trigger_id=\"github-new-issue\"} 1",
1735            "harn_trigger_inflight{trigger_id=\"github-new-issue\"} 0",
1736            "harn_event_log_append_duration_seconds_bucket{le=\"0.005\",topic=\"orchestrator.triggers.pending\"} 1",
1737            "harn_event_log_topic_size_bytes{topic=\"orchestrator.triggers.pending\"} 2048",
1738            "harn_event_log_consumer_lag{consumer=\"orchestrator-pump\",topic=\"orchestrator.triggers.pending\"} 0",
1739            "harn_trigger_budget_cost_today_usd{trigger_id=\"github-new-issue\"} 0.002",
1740            "harn_trigger_budget_exhausted_total{strategy=\"daily_budget_exceeded\",trigger_id=\"github-new-issue\"} 1",
1741            "harn_a2a_hops_total{outcome=\"succeeded\",target=\"agent.example\"} 1",
1742            "harn_a2a_hop_duration_seconds_bucket{le=\"0.01\",target=\"agent.example\"} 1",
1743            "harn_worker_queue_depth{queue=\"triage\"} 1",
1744            "harn_worker_queue_claim_age_seconds_bucket{le=\"5\",queue=\"triage\"} 1",
1745            "harn_orchestrator_pump_backlog{topic=\"trigger.inbox.envelopes\"} 2",
1746            "harn_orchestrator_pump_outstanding{topic=\"trigger.inbox.envelopes\"} 1",
1747            "harn_orchestrator_pump_admission_delay_seconds_bucket{le=\"0.05\",topic=\"trigger.inbox.envelopes\"} 1",
1748            "harn_llm_calls_total{model=\"mock\",outcome=\"succeeded\",provider=\"mock\"} 1",
1749            "harn_llm_cost_usd_total{model=\"mock\",provider=\"mock\"} 0.01",
1750            "harn_llm_cache_hits_total{provider=\"mock\"} 1",
1751        ] {
1752            assert!(rendered.contains(needle), "missing {needle}\n{rendered}");
1753        }
1754    }
1755}