Skip to main content

harn_vm/connectors/
mod.rs

1//! Connector traits and shared helpers for inbound event-source providers.
2//!
3//! This lands in `harn-vm` for now because the current dependency surface
4//! (`EventLog`, `SecretProvider`, `TriggerEvent`) already lives here. If the
5//! connector ecosystem grows enough to justify extraction, the module can be
6//! split into a dedicated crate later without changing the high-level contract.
7
8use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex, OnceLock};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25    registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26    ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod a2a_push;
30pub mod cron;
31pub mod github;
32pub mod harn_module;
33pub mod hmac;
34pub mod linear;
35pub mod notion;
36pub mod slack;
37#[cfg(test)]
38pub(crate) mod test_util;
39pub mod webhook;
40
41pub use a2a_push::A2aPushConnector;
42pub use cron::{CatchupMode, CronConnector};
43pub use github::GitHubConnector;
44pub use harn_module::{
45    load_contract as load_harn_connector_contract, HarnConnector, HarnConnectorContract,
46};
47pub use hmac::{
48    verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
49    DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
50    DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
51    DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
52    DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
53    DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
54    SIGNATURE_VERIFY_AUDIT_TOPIC,
55};
56pub use linear::LinearConnector;
57pub use notion::{
58    load_pending_webhook_handshakes, NotionConnector, PersistedNotionWebhookHandshake,
59};
60pub use slack::SlackConnector;
61use webhook::WebhookProviderProfile;
62pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
63
64/// Shared owned handle to a connector instance registered with the runtime.
65pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
66
67thread_local! {
68    static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
69        RefCell::new(BTreeMap::new());
70}
71
72/// Provider implementation contract for inbound connectors.
73#[async_trait]
74pub trait Connector: Send + Sync {
75    /// Stable provider id such as `github`, `slack`, or `webhook`.
76    fn provider_id(&self) -> &ProviderId;
77
78    /// Trigger kinds this connector supports (`webhook`, `poll`, `stream`, ...).
79    fn kinds(&self) -> &[TriggerKind];
80
81    /// Called once per connector instance at orchestrator startup.
82    async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
83
84    /// Activate the bindings relevant to this connector instance.
85    async fn activate(
86        &self,
87        bindings: &[TriggerBinding],
88    ) -> Result<ActivationHandle, ConnectorError>;
89
90    /// Stop connector-owned background work and flush any connector-local state.
91    async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
92        Ok(())
93    }
94
95    /// Verify + normalize a provider-native inbound request into `TriggerEvent`.
96    async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
97
98    /// Payload schema surfaced to future trigger-type narrowing.
99    fn payload_schema(&self) -> ProviderPayloadSchema;
100
101    /// Outbound API wrapper exposed to handlers.
102    fn client(&self) -> Arc<dyn ConnectorClient>;
103}
104
105#[derive(Clone, Debug, PartialEq)]
106pub enum PostNormalizeOutcome {
107    Ready(Box<TriggerEvent>),
108    DuplicateDropped,
109}
110
111pub async fn postprocess_normalized_event(
112    inbox: &InboxIndex,
113    binding_id: &str,
114    dedupe_enabled: bool,
115    dedupe_ttl: StdDuration,
116    mut event: TriggerEvent,
117) -> Result<PostNormalizeOutcome, ConnectorError> {
118    if dedupe_enabled && !event.dedupe_claimed() {
119        if !inbox
120            .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
121            .await?
122        {
123            return Ok(PostNormalizeOutcome::DuplicateDropped);
124        }
125        event.mark_dedupe_claimed();
126    }
127
128    Ok(PostNormalizeOutcome::Ready(Box::new(event)))
129}
130
131/// Outbound provider client interface used by connector-backed stdlib modules.
132#[async_trait]
133pub trait ConnectorClient: Send + Sync {
134    async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
135}
136
137/// Minimal outbound client errors shared by connector implementations.
138#[derive(Clone, Debug, PartialEq, Eq)]
139pub enum ClientError {
140    MethodNotFound(String),
141    InvalidArgs(String),
142    RateLimited(String),
143    Transport(String),
144    Other(String),
145}
146
147impl fmt::Display for ClientError {
148    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149        match self {
150            Self::MethodNotFound(message)
151            | Self::InvalidArgs(message)
152            | Self::RateLimited(message)
153            | Self::Transport(message)
154            | Self::Other(message) => message.fmt(f),
155        }
156    }
157}
158
159impl std::error::Error for ClientError {}
160
161/// Shared connector-layer errors.
162#[derive(Debug)]
163pub enum ConnectorError {
164    DuplicateProvider(String),
165    DuplicateDelivery(String),
166    UnknownProvider(String),
167    MissingHeader(String),
168    InvalidHeader {
169        name: String,
170        detail: String,
171    },
172    InvalidSignature(String),
173    TimestampOutOfWindow {
174        timestamp: OffsetDateTime,
175        now: OffsetDateTime,
176        window: time::Duration,
177    },
178    Json(String),
179    Secret(String),
180    EventLog(String),
181    HarnRuntime(String),
182    Client(ClientError),
183    Unsupported(String),
184    Activation(String),
185}
186
187impl ConnectorError {
188    pub fn invalid_signature(message: impl Into<String>) -> Self {
189        Self::InvalidSignature(message.into())
190    }
191}
192
193impl fmt::Display for ConnectorError {
194    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195        match self {
196            Self::DuplicateProvider(provider) => {
197                write!(f, "connector provider `{provider}` is already registered")
198            }
199            Self::DuplicateDelivery(message) => message.fmt(f),
200            Self::UnknownProvider(provider) => {
201                write!(f, "connector provider `{provider}` is not registered")
202            }
203            Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
204            Self::InvalidHeader { name, detail } => {
205                write!(f, "invalid header `{name}`: {detail}")
206            }
207            Self::InvalidSignature(message)
208            | Self::Json(message)
209            | Self::Secret(message)
210            | Self::EventLog(message)
211            | Self::HarnRuntime(message)
212            | Self::Unsupported(message)
213            | Self::Activation(message) => message.fmt(f),
214            Self::TimestampOutOfWindow {
215                timestamp,
216                now,
217                window,
218            } => write!(
219                f,
220                "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
221            ),
222            Self::Client(error) => error.fmt(f),
223        }
224    }
225}
226
227impl std::error::Error for ConnectorError {}
228
229impl From<crate::event_log::LogError> for ConnectorError {
230    fn from(value: crate::event_log::LogError) -> Self {
231        Self::EventLog(value.to_string())
232    }
233}
234
235impl From<crate::secrets::SecretError> for ConnectorError {
236    fn from(value: crate::secrets::SecretError) -> Self {
237        Self::Secret(value.to_string())
238    }
239}
240
241impl From<serde_json::Error> for ConnectorError {
242    fn from(value: serde_json::Error) -> Self {
243        Self::Json(value.to_string())
244    }
245}
246
247impl From<ClientError> for ConnectorError {
248    fn from(value: ClientError) -> Self {
249        Self::Client(value)
250    }
251}
252
253/// Startup context shared with connector instances.
254#[derive(Clone)]
255pub struct ConnectorCtx {
256    pub event_log: Arc<AnyEventLog>,
257    pub secrets: Arc<dyn SecretProvider>,
258    pub inbox: Arc<InboxIndex>,
259    pub metrics: Arc<MetricsRegistry>,
260    pub rate_limiter: Arc<RateLimiterFactory>,
261}
262
263/// Snapshot of connector-local metrics surfaced for tests and diagnostics.
264#[derive(Clone, Debug, Default, PartialEq, Eq)]
265pub struct ConnectorMetricsSnapshot {
266    pub inbox_claims_written: u64,
267    pub inbox_duplicates_rejected: u64,
268    pub inbox_fast_path_hits: u64,
269    pub inbox_durable_hits: u64,
270    pub inbox_expired_entries: u64,
271    pub inbox_active_entries: u64,
272    pub linear_timestamp_rejections_total: u64,
273    pub dispatch_succeeded_total: u64,
274    pub dispatch_failed_total: u64,
275    pub retry_scheduled_total: u64,
276    pub slack_delivery_success_total: u64,
277    pub slack_delivery_failure_total: u64,
278}
279
280type MetricLabels = BTreeMap<String, String>;
281
282#[derive(Clone, Debug, Default, PartialEq)]
283struct HistogramMetric {
284    buckets: BTreeMap<String, u64>,
285    count: u64,
286    sum: f64,
287}
288
289static ACTIVE_METRICS_REGISTRY: OnceLock<Mutex<Option<Arc<MetricsRegistry>>>> = OnceLock::new();
290
291pub fn install_active_metrics_registry(metrics: Arc<MetricsRegistry>) {
292    let slot = ACTIVE_METRICS_REGISTRY.get_or_init(|| Mutex::new(None));
293    *slot.lock().expect("active metrics registry poisoned") = Some(metrics);
294}
295
296pub fn clear_active_metrics_registry() {
297    if let Some(slot) = ACTIVE_METRICS_REGISTRY.get() {
298        *slot.lock().expect("active metrics registry poisoned") = None;
299    }
300}
301
302pub fn active_metrics_registry() -> Option<Arc<MetricsRegistry>> {
303    ACTIVE_METRICS_REGISTRY.get().and_then(|slot| {
304        slot.lock()
305            .expect("active metrics registry poisoned")
306            .clone()
307    })
308}
309
310/// Shared metrics surface for connector-local counters and timings.
311#[derive(Debug, Default)]
312pub struct MetricsRegistry {
313    inbox_claims_written: AtomicU64,
314    inbox_duplicates_rejected: AtomicU64,
315    inbox_fast_path_hits: AtomicU64,
316    inbox_durable_hits: AtomicU64,
317    inbox_expired_entries: AtomicU64,
318    inbox_active_entries: AtomicU64,
319    linear_timestamp_rejections_total: AtomicU64,
320    dispatch_succeeded_total: AtomicU64,
321    dispatch_failed_total: AtomicU64,
322    retry_scheduled_total: AtomicU64,
323    slack_delivery_success_total: AtomicU64,
324    slack_delivery_failure_total: AtomicU64,
325    custom_counters: Mutex<BTreeMap<String, u64>>,
326    counters: Mutex<BTreeMap<(String, MetricLabels), f64>>,
327    gauges: Mutex<BTreeMap<(String, MetricLabels), f64>>,
328    histograms: Mutex<BTreeMap<(String, MetricLabels), HistogramMetric>>,
329}
330
331impl MetricsRegistry {
332    const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0];
333    const SIZE_BUCKETS: [f64; 9] = [
334        128.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0, 10485760.0,
335    ];
336
337    pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
338        ConnectorMetricsSnapshot {
339            inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
340            inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
341            inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
342            inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
343            inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
344            inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
345            linear_timestamp_rejections_total: self
346                .linear_timestamp_rejections_total
347                .load(Ordering::Relaxed),
348            dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
349            dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
350            retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
351            slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
352            slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
353        }
354    }
355
356    pub(crate) fn record_inbox_claim(&self) {
357        self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
358    }
359
360    pub(crate) fn record_inbox_duplicate_fast_path(&self) {
361        self.inbox_duplicates_rejected
362            .fetch_add(1, Ordering::Relaxed);
363        self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
364    }
365
366    pub(crate) fn record_inbox_duplicate_durable(&self) {
367        self.inbox_duplicates_rejected
368            .fetch_add(1, Ordering::Relaxed);
369        self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
370    }
371
372    pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
373        if count > 0 {
374            self.inbox_expired_entries
375                .fetch_add(count, Ordering::Relaxed);
376        }
377    }
378
379    pub(crate) fn set_inbox_active_entries(&self, count: usize) {
380        self.inbox_active_entries
381            .store(count as u64, Ordering::Relaxed);
382    }
383
384    pub fn record_linear_timestamp_rejection(&self) {
385        self.linear_timestamp_rejections_total
386            .fetch_add(1, Ordering::Relaxed);
387    }
388
389    pub fn record_dispatch_succeeded(&self) {
390        self.dispatch_succeeded_total
391            .fetch_add(1, Ordering::Relaxed);
392    }
393
394    pub fn record_dispatch_failed(&self) {
395        self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
396    }
397
398    pub fn record_retry_scheduled(&self) {
399        self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
400    }
401
402    pub fn record_slack_delivery_success(&self) {
403        self.slack_delivery_success_total
404            .fetch_add(1, Ordering::Relaxed);
405    }
406
407    pub fn record_slack_delivery_failure(&self) {
408        self.slack_delivery_failure_total
409            .fetch_add(1, Ordering::Relaxed);
410    }
411
412    pub fn record_custom_counter(&self, name: &str, amount: u64) {
413        if amount == 0 {
414            return;
415        }
416        let mut counters = self
417            .custom_counters
418            .lock()
419            .expect("custom counters poisoned");
420        *counters.entry(name.to_string()).or_default() += amount;
421    }
422
423    pub fn record_http_request(
424        &self,
425        endpoint: &str,
426        method: &str,
427        status: u16,
428        duration: StdDuration,
429        body_size_bytes: usize,
430    ) {
431        self.increment_counter(
432            "harn_http_requests_total",
433            labels([
434                ("endpoint", endpoint),
435                ("method", method),
436                ("status", &status.to_string()),
437            ]),
438            1,
439        );
440        self.observe_histogram(
441            "harn_http_request_duration_seconds",
442            labels([("endpoint", endpoint)]),
443            duration.as_secs_f64(),
444            &Self::DURATION_BUCKETS,
445        );
446        self.observe_histogram(
447            "harn_http_body_size_bytes",
448            labels([("endpoint", endpoint)]),
449            body_size_bytes as f64,
450            &Self::SIZE_BUCKETS,
451        );
452    }
453
454    pub fn record_trigger_received(&self, trigger_id: &str, provider: &str) {
455        self.increment_counter(
456            "harn_trigger_received_total",
457            labels([("trigger_id", trigger_id), ("provider", provider)]),
458            1,
459        );
460    }
461
462    pub fn record_trigger_deduped(&self, trigger_id: &str, reason: &str) {
463        self.increment_counter(
464            "harn_trigger_deduped_total",
465            labels([("trigger_id", trigger_id), ("reason", reason)]),
466            1,
467        );
468    }
469
470    pub fn record_trigger_predicate_evaluation(
471        &self,
472        trigger_id: &str,
473        result: bool,
474        cost_usd: f64,
475    ) {
476        self.increment_counter(
477            "harn_trigger_predicate_evaluations_total",
478            labels([
479                ("trigger_id", trigger_id),
480                ("result", if result { "true" } else { "false" }),
481            ]),
482            1,
483        );
484        self.observe_histogram(
485            "harn_trigger_predicate_cost_usd",
486            labels([("trigger_id", trigger_id)]),
487            cost_usd.max(0.0),
488            &[0.0, 0.001, 0.01, 0.05, 0.1, 1.0],
489        );
490    }
491
492    pub fn record_trigger_dispatched(&self, trigger_id: &str, handler_kind: &str, outcome: &str) {
493        self.increment_counter(
494            "harn_trigger_dispatched_total",
495            labels([
496                ("trigger_id", trigger_id),
497                ("handler_kind", handler_kind),
498                ("outcome", outcome),
499            ]),
500            1,
501        );
502    }
503
504    pub fn record_trigger_retry(&self, trigger_id: &str, attempt: u32) {
505        self.increment_counter(
506            "harn_trigger_retries_total",
507            labels([
508                ("trigger_id", trigger_id),
509                ("attempt", &attempt.to_string()),
510            ]),
511            1,
512        );
513    }
514
515    pub fn record_trigger_dlq(&self, trigger_id: &str, reason: &str) {
516        self.increment_counter(
517            "harn_trigger_dlq_total",
518            labels([("trigger_id", trigger_id), ("reason", reason)]),
519            1,
520        );
521    }
522
523    pub fn set_trigger_inflight(&self, trigger_id: &str, count: u64) {
524        self.set_gauge(
525            "harn_trigger_inflight",
526            labels([("trigger_id", trigger_id)]),
527            count as f64,
528        );
529    }
530
531    pub fn set_trigger_budget_cost_today(&self, trigger_id: &str, cost_usd: f64) {
532        self.set_gauge(
533            "harn_trigger_budget_cost_today_usd",
534            labels([("trigger_id", trigger_id)]),
535            cost_usd.max(0.0),
536        );
537    }
538
539    pub fn record_trigger_budget_exhausted(&self, trigger_id: &str, strategy: &str) {
540        self.increment_counter(
541            "harn_trigger_budget_exhausted_total",
542            labels([("trigger_id", trigger_id), ("strategy", strategy)]),
543            1,
544        );
545    }
546
547    pub fn record_event_log_append(
548        &self,
549        topic: &str,
550        duration: StdDuration,
551        payload_bytes: usize,
552    ) {
553        self.observe_histogram(
554            "harn_event_log_append_duration_seconds",
555            labels([("topic", topic)]),
556            duration.as_secs_f64(),
557            &Self::DURATION_BUCKETS,
558        );
559        self.set_gauge(
560            "harn_event_log_topic_size_bytes",
561            labels([("topic", topic)]),
562            payload_bytes as f64,
563        );
564    }
565
566    pub fn set_event_log_consumer_lag(&self, topic: &str, consumer: &str, lag: u64) {
567        self.set_gauge(
568            "harn_event_log_consumer_lag",
569            labels([("topic", topic), ("consumer", consumer)]),
570            lag as f64,
571        );
572    }
573
574    pub fn record_a2a_hop(&self, target: &str, outcome: &str, duration: StdDuration) {
575        self.increment_counter(
576            "harn_a2a_hops_total",
577            labels([("target", target), ("outcome", outcome)]),
578            1,
579        );
580        self.observe_histogram(
581            "harn_a2a_hop_duration_seconds",
582            labels([("target", target)]),
583            duration.as_secs_f64(),
584            &Self::DURATION_BUCKETS,
585        );
586    }
587
588    pub fn set_worker_queue_depth(&self, queue: &str, depth: u64) {
589        self.set_gauge(
590            "harn_worker_queue_depth",
591            labels([("queue", queue)]),
592            depth as f64,
593        );
594    }
595
596    pub fn record_worker_queue_claim_age(&self, queue: &str, age_seconds: f64) {
597        self.observe_histogram(
598            "harn_worker_queue_claim_age_seconds",
599            labels([("queue", queue)]),
600            age_seconds.max(0.0),
601            &Self::DURATION_BUCKETS,
602        );
603    }
604
605    pub fn record_llm_call(&self, provider: &str, model: &str, outcome: &str, cost_usd: f64) {
606        self.increment_counter(
607            "harn_llm_calls_total",
608            labels([
609                ("provider", provider),
610                ("model", model),
611                ("outcome", outcome),
612            ]),
613            1,
614        );
615        if cost_usd > 0.0 {
616            self.increment_counter(
617                "harn_llm_cost_usd_total",
618                labels([("provider", provider), ("model", model)]),
619                cost_usd,
620            );
621        } else {
622            self.ensure_counter(
623                "harn_llm_cost_usd_total",
624                labels([("provider", provider), ("model", model)]),
625            );
626        }
627    }
628
629    pub fn record_llm_cache_hit(&self, provider: &str) {
630        self.increment_counter(
631            "harn_llm_cache_hits_total",
632            labels([("provider", provider)]),
633            1,
634        );
635    }
636
637    pub fn render_prometheus(&self) -> String {
638        let snapshot = self.snapshot();
639        let counters = [
640            (
641                "connector_linear_timestamp_rejections_total",
642                snapshot.linear_timestamp_rejections_total,
643            ),
644            (
645                "dispatch_succeeded_total",
646                snapshot.dispatch_succeeded_total,
647            ),
648            ("dispatch_failed_total", snapshot.dispatch_failed_total),
649            ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
650            ("retry_scheduled_total", snapshot.retry_scheduled_total),
651            (
652                "slack_events_delivery_success_total",
653                snapshot.slack_delivery_success_total,
654            ),
655            (
656                "slack_events_delivery_failure_total",
657                snapshot.slack_delivery_failure_total,
658            ),
659        ];
660
661        let mut rendered = String::new();
662        for (name, value) in counters {
663            rendered.push_str("# TYPE ");
664            rendered.push_str(name);
665            rendered.push_str(" counter\n");
666            rendered.push_str(name);
667            rendered.push(' ');
668            rendered.push_str(&value.to_string());
669            rendered.push('\n');
670        }
671        let custom_counters = self
672            .custom_counters
673            .lock()
674            .expect("custom counters poisoned");
675        for (name, value) in custom_counters.iter() {
676            let metric_name = format!(
677                "connector_custom_{}_total",
678                name.chars()
679                    .map(|ch| if ch.is_ascii_alphanumeric() || ch == '_' {
680                        ch
681                    } else {
682                        '_'
683                    })
684                    .collect::<String>()
685            );
686            rendered.push_str("# TYPE ");
687            rendered.push_str(&metric_name);
688            rendered.push_str(" counter\n");
689            rendered.push_str(&metric_name);
690            rendered.push(' ');
691            rendered.push_str(&value.to_string());
692            rendered.push('\n');
693        }
694        rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
695        rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
696        rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
697        rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
698        self.render_generic_metrics(&mut rendered);
699        rendered
700    }
701
702    fn increment_counter(&self, name: &str, labels: MetricLabels, amount: impl Into<f64>) {
703        let amount = amount.into();
704        if amount <= 0.0 || !amount.is_finite() {
705            return;
706        }
707        let mut counters = self.counters.lock().expect("metrics counters poisoned");
708        *counters.entry((name.to_string(), labels)).or_default() += amount;
709    }
710
711    fn ensure_counter(&self, name: &str, labels: MetricLabels) {
712        let mut counters = self.counters.lock().expect("metrics counters poisoned");
713        counters.entry((name.to_string(), labels)).or_default();
714    }
715
716    fn set_gauge(&self, name: &str, labels: MetricLabels, value: f64) {
717        let mut gauges = self.gauges.lock().expect("metrics gauges poisoned");
718        gauges.insert((name.to_string(), labels), value);
719    }
720
721    fn observe_histogram(
722        &self,
723        name: &str,
724        labels: MetricLabels,
725        value: f64,
726        bucket_bounds: &[f64],
727    ) {
728        if !value.is_finite() {
729            return;
730        }
731        let mut histograms = self.histograms.lock().expect("metrics histograms poisoned");
732        let histogram = histograms
733            .entry((name.to_string(), labels))
734            .or_insert_with(|| HistogramMetric {
735                buckets: bucket_bounds
736                    .iter()
737                    .map(|bound| (prometheus_float(*bound), 0))
738                    .chain(std::iter::once(("+Inf".to_string(), 0)))
739                    .collect(),
740                count: 0,
741                sum: 0.0,
742            });
743        histogram.count += 1;
744        histogram.sum += value;
745        for bound in bucket_bounds {
746            if value <= *bound {
747                let key = prometheus_float(*bound);
748                *histogram.buckets.entry(key).or_default() += 1;
749            }
750        }
751        *histogram.buckets.entry("+Inf".to_string()).or_default() += 1;
752    }
753
754    fn render_generic_metrics(&self, rendered: &mut String) {
755        let counters = self
756            .counters
757            .lock()
758            .expect("metrics counters poisoned")
759            .clone();
760        let gauges = self.gauges.lock().expect("metrics gauges poisoned").clone();
761        let histograms = self
762            .histograms
763            .lock()
764            .expect("metrics histograms poisoned")
765            .clone();
766
767        for name in metric_family_names(MetricKind::Counter) {
768            rendered.push_str("# TYPE ");
769            rendered.push_str(name);
770            rendered.push_str(" counter\n");
771            for ((sample_name, labels), value) in counters.iter().filter(|((n, _), _)| n == name) {
772                render_sample(rendered, sample_name, labels, *value);
773            }
774        }
775        for name in metric_family_names(MetricKind::Gauge) {
776            rendered.push_str("# TYPE ");
777            rendered.push_str(name);
778            rendered.push_str(" gauge\n");
779            for ((sample_name, labels), value) in gauges.iter().filter(|((n, _), _)| n == name) {
780                render_sample(rendered, sample_name, labels, *value);
781            }
782        }
783        for name in metric_family_names(MetricKind::Histogram) {
784            rendered.push_str("# TYPE ");
785            rendered.push_str(name);
786            rendered.push_str(" histogram\n");
787            for ((sample_name, labels), histogram) in
788                histograms.iter().filter(|((n, _), _)| n == name)
789            {
790                for (le, value) in &histogram.buckets {
791                    let mut bucket_labels = labels.clone();
792                    bucket_labels.insert("le".to_string(), le.clone());
793                    render_sample(
794                        rendered,
795                        &format!("{sample_name}_bucket"),
796                        &bucket_labels,
797                        *value as f64,
798                    );
799                }
800                render_sample(
801                    rendered,
802                    &format!("{sample_name}_sum"),
803                    labels,
804                    histogram.sum,
805                );
806                render_sample(
807                    rendered,
808                    &format!("{sample_name}_count"),
809                    labels,
810                    histogram.count as f64,
811                );
812            }
813        }
814    }
815}
816
817#[derive(Clone, Copy)]
818enum MetricKind {
819    Counter,
820    Gauge,
821    Histogram,
822}
823
824fn metric_family_names(kind: MetricKind) -> &'static [&'static str] {
825    match kind {
826        MetricKind::Counter => &[
827            "harn_http_requests_total",
828            "harn_trigger_received_total",
829            "harn_trigger_deduped_total",
830            "harn_trigger_predicate_evaluations_total",
831            "harn_trigger_dispatched_total",
832            "harn_trigger_retries_total",
833            "harn_trigger_dlq_total",
834            "harn_trigger_budget_exhausted_total",
835            "harn_a2a_hops_total",
836            "harn_llm_calls_total",
837            "harn_llm_cost_usd_total",
838            "harn_llm_cache_hits_total",
839        ],
840        MetricKind::Gauge => &[
841            "harn_trigger_inflight",
842            "harn_event_log_topic_size_bytes",
843            "harn_event_log_consumer_lag",
844            "harn_trigger_budget_cost_today_usd",
845            "harn_worker_queue_depth",
846        ],
847        MetricKind::Histogram => &[
848            "harn_http_request_duration_seconds",
849            "harn_http_body_size_bytes",
850            "harn_trigger_predicate_cost_usd",
851            "harn_event_log_append_duration_seconds",
852            "harn_a2a_hop_duration_seconds",
853            "harn_worker_queue_claim_age_seconds",
854        ],
855    }
856}
857
858fn labels<const N: usize>(pairs: [(&str, &str); N]) -> MetricLabels {
859    pairs
860        .into_iter()
861        .map(|(name, value)| (name.to_string(), value.to_string()))
862        .collect()
863}
864
865fn render_sample(rendered: &mut String, name: &str, labels: &MetricLabels, value: f64) {
866    rendered.push_str(name);
867    if !labels.is_empty() {
868        rendered.push('{');
869        for (index, (label, label_value)) in labels.iter().enumerate() {
870            if index > 0 {
871                rendered.push(',');
872            }
873            rendered.push_str(label);
874            rendered.push_str("=\"");
875            rendered.push_str(&escape_label_value(label_value));
876            rendered.push('"');
877        }
878        rendered.push('}');
879    }
880    rendered.push(' ');
881    rendered.push_str(&prometheus_float(value));
882    rendered.push('\n');
883}
884
885fn escape_label_value(value: &str) -> String {
886    value
887        .chars()
888        .flat_map(|ch| match ch {
889            '\\' => "\\\\".chars().collect::<Vec<_>>(),
890            '"' => "\\\"".chars().collect::<Vec<_>>(),
891            '\n' => "\\n".chars().collect::<Vec<_>>(),
892            other => vec![other],
893        })
894        .collect()
895}
896
897fn prometheus_float(value: f64) -> String {
898    if value.is_infinite() && value.is_sign_positive() {
899        return "+Inf".to_string();
900    }
901    if value.fract() == 0.0 {
902        format!("{value:.0}")
903    } else {
904        let rendered = format!("{value:.6}");
905        rendered
906            .trim_end_matches('0')
907            .trim_end_matches('.')
908            .to_string()
909    }
910}
911
912/// Provider payload schema metadata exposed by a connector.
913#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
914pub struct ProviderPayloadSchema {
915    pub harn_schema_name: String,
916    #[serde(default)]
917    pub json_schema: JsonValue,
918}
919
920impl ProviderPayloadSchema {
921    pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
922        Self {
923            harn_schema_name: harn_schema_name.into(),
924            json_schema,
925        }
926    }
927
928    pub fn named(harn_schema_name: impl Into<String>) -> Self {
929        Self::new(harn_schema_name, JsonValue::Null)
930    }
931}
932
933impl Default for ProviderPayloadSchema {
934    fn default() -> Self {
935        Self::named("raw")
936    }
937}
938
939/// High-level transport kind a connector supports.
940#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
941#[serde(transparent)]
942pub struct TriggerKind(String);
943
944impl TriggerKind {
945    pub fn new(value: impl Into<String>) -> Self {
946        Self(value.into())
947    }
948
949    pub fn as_str(&self) -> &str {
950        self.0.as_str()
951    }
952}
953
954impl From<&str> for TriggerKind {
955    fn from(value: &str) -> Self {
956        Self::new(value)
957    }
958}
959
960impl From<String> for TriggerKind {
961    fn from(value: String) -> Self {
962        Self::new(value)
963    }
964}
965
966/// Future trigger manifest binding routed to a connector activation.
967#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
968pub struct TriggerBinding {
969    pub provider: ProviderId,
970    pub kind: TriggerKind,
971    pub binding_id: String,
972    #[serde(default)]
973    pub dedupe_key: Option<String>,
974    #[serde(default = "default_dedupe_retention_days")]
975    pub dedupe_retention_days: u32,
976    #[serde(default)]
977    pub config: JsonValue,
978}
979
980impl TriggerBinding {
981    pub fn new(
982        provider: ProviderId,
983        kind: impl Into<TriggerKind>,
984        binding_id: impl Into<String>,
985    ) -> Self {
986        Self {
987            provider,
988            kind: kind.into(),
989            binding_id: binding_id.into(),
990            dedupe_key: None,
991            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
992            config: JsonValue::Null,
993        }
994    }
995}
996
997fn default_dedupe_retention_days() -> u32 {
998    crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
999}
1000
1001/// Small in-memory trigger-binding registry used to fan bindings into connectors.
1002#[derive(Clone, Debug, Default)]
1003pub struct TriggerRegistry {
1004    bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
1005}
1006
1007impl TriggerRegistry {
1008    pub fn register(&mut self, binding: TriggerBinding) {
1009        self.bindings
1010            .entry(binding.provider.clone())
1011            .or_default()
1012            .push(binding);
1013    }
1014
1015    pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
1016        &self.bindings
1017    }
1018
1019    pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
1020        self.bindings
1021            .get(provider)
1022            .map(Vec::as_slice)
1023            .unwrap_or(&[])
1024    }
1025}
1026
1027/// Metadata returned from connector activation.
1028#[derive(Clone, Debug, PartialEq, Eq)]
1029pub struct ActivationHandle {
1030    pub provider: ProviderId,
1031    pub binding_count: usize,
1032}
1033
1034impl ActivationHandle {
1035    pub fn new(provider: ProviderId, binding_count: usize) -> Self {
1036        Self {
1037            provider,
1038            binding_count,
1039        }
1040    }
1041}
1042
1043/// Provider-native inbound request payload preserved as raw bytes.
1044#[derive(Clone, Debug, PartialEq)]
1045pub struct RawInbound {
1046    pub kind: String,
1047    pub headers: BTreeMap<String, String>,
1048    pub query: BTreeMap<String, String>,
1049    pub body: Vec<u8>,
1050    pub received_at: OffsetDateTime,
1051    pub occurred_at: Option<OffsetDateTime>,
1052    pub tenant_id: Option<TenantId>,
1053    pub metadata: JsonValue,
1054}
1055
1056impl RawInbound {
1057    pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
1058        Self {
1059            kind: kind.into(),
1060            headers,
1061            query: BTreeMap::new(),
1062            body,
1063            received_at: clock::now_utc(),
1064            occurred_at: None,
1065            tenant_id: None,
1066            metadata: JsonValue::Null,
1067        }
1068    }
1069
1070    pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
1071        Ok(serde_json::from_slice(&self.body)?)
1072    }
1073}
1074
1075/// Token-bucket configuration shared across connector clients.
1076#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1077pub struct RateLimitConfig {
1078    pub capacity: u32,
1079    pub refill_tokens: u32,
1080    pub refill_interval: StdDuration,
1081}
1082
1083impl Default for RateLimitConfig {
1084    fn default() -> Self {
1085        Self {
1086            capacity: 60,
1087            refill_tokens: 1,
1088            refill_interval: StdDuration::from_secs(1),
1089        }
1090    }
1091}
1092
1093#[derive(Clone, Debug)]
1094struct TokenBucket {
1095    tokens: f64,
1096    last_refill: ClockInstant,
1097}
1098
1099impl TokenBucket {
1100    fn full(config: RateLimitConfig) -> Self {
1101        Self {
1102            tokens: config.capacity as f64,
1103            last_refill: clock::instant_now(),
1104        }
1105    }
1106
1107    fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
1108        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1109        let rate = config.refill_tokens.max(1) as f64 / interval;
1110        let elapsed = now.duration_since(self.last_refill).as_secs_f64();
1111        self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
1112        self.last_refill = now;
1113    }
1114
1115    fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
1116        self.refill(config, now);
1117        if self.tokens >= 1.0 {
1118            self.tokens -= 1.0;
1119            true
1120        } else {
1121            false
1122        }
1123    }
1124
1125    fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
1126        if self.tokens >= 1.0 {
1127            return StdDuration::ZERO;
1128        }
1129        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1130        let rate = config.refill_tokens.max(1) as f64 / interval;
1131        let missing = (1.0 - self.tokens).max(0.0);
1132        StdDuration::from_secs_f64((missing / rate).max(0.001))
1133    }
1134}
1135
1136/// Shared per-provider, per-key token bucket factory for outbound connector clients.
1137#[derive(Debug)]
1138pub struct RateLimiterFactory {
1139    config: RateLimitConfig,
1140    buckets: Mutex<HashMap<(String, String), TokenBucket>>,
1141}
1142
1143impl RateLimiterFactory {
1144    pub fn new(config: RateLimitConfig) -> Self {
1145        Self {
1146            config,
1147            buckets: Mutex::new(HashMap::new()),
1148        }
1149    }
1150
1151    pub fn config(&self) -> RateLimitConfig {
1152        self.config
1153    }
1154
1155    pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
1156        ScopedRateLimiter {
1157            factory: self,
1158            provider: provider.clone(),
1159            key: key.into(),
1160        }
1161    }
1162
1163    pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
1164        let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1165        let bucket = buckets
1166            .entry((provider.as_str().to_string(), key.to_string()))
1167            .or_insert_with(|| TokenBucket::full(self.config));
1168        bucket.try_acquire(self.config, clock::instant_now())
1169    }
1170
1171    pub async fn acquire(&self, provider: &ProviderId, key: &str) {
1172        loop {
1173            let wait = {
1174                let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1175                let bucket = buckets
1176                    .entry((provider.as_str().to_string(), key.to_string()))
1177                    .or_insert_with(|| TokenBucket::full(self.config));
1178                if bucket.try_acquire(self.config, clock::instant_now()) {
1179                    return;
1180                }
1181                bucket.wait_duration(self.config)
1182            };
1183            tokio::time::sleep(wait).await;
1184        }
1185    }
1186}
1187
1188impl Default for RateLimiterFactory {
1189    fn default() -> Self {
1190        Self::new(RateLimitConfig::default())
1191    }
1192}
1193
1194/// Borrowed view onto a single provider/key rate-limit scope.
1195#[derive(Clone, Debug)]
1196pub struct ScopedRateLimiter<'a> {
1197    factory: &'a RateLimiterFactory,
1198    provider: ProviderId,
1199    key: String,
1200}
1201
1202impl<'a> ScopedRateLimiter<'a> {
1203    pub fn try_acquire(&self) -> bool {
1204        self.factory.try_acquire(&self.provider, &self.key)
1205    }
1206
1207    pub async fn acquire(&self) {
1208        self.factory.acquire(&self.provider, &self.key).await;
1209    }
1210}
1211
1212/// Runtime connector registry keyed by provider id.
1213pub struct ConnectorRegistry {
1214    connectors: BTreeMap<ProviderId, ConnectorHandle>,
1215}
1216
1217impl ConnectorRegistry {
1218    pub fn empty() -> Self {
1219        Self {
1220            connectors: BTreeMap::new(),
1221        }
1222    }
1223
1224    pub fn with_defaults() -> Self {
1225        let mut registry = Self::empty();
1226        for provider in registered_provider_metadata() {
1227            registry
1228                .register(default_connector_for_provider(&provider))
1229                .expect("default connector registration should not fail");
1230        }
1231        registry
1232    }
1233
1234    pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
1235        let provider = connector.provider_id().clone();
1236        if self.connectors.contains_key(&provider) {
1237            return Err(ConnectorError::DuplicateProvider(provider.0));
1238        }
1239        self.connectors
1240            .insert(provider, Arc::new(AsyncMutex::new(connector)));
1241        Ok(())
1242    }
1243
1244    pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
1245        self.connectors.get(id).cloned()
1246    }
1247
1248    pub fn remove(&mut self, id: &ProviderId) -> Option<ConnectorHandle> {
1249        self.connectors.remove(id)
1250    }
1251
1252    pub fn list(&self) -> Vec<ProviderId> {
1253        self.connectors.keys().cloned().collect()
1254    }
1255
1256    pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1257        for connector in self.connectors.values() {
1258            connector.lock().await.init(ctx.clone()).await?;
1259        }
1260        Ok(())
1261    }
1262
1263    pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
1264        let mut clients = BTreeMap::new();
1265        for (provider, connector) in &self.connectors {
1266            let client = connector.lock().await.client();
1267            clients.insert(provider.clone(), client);
1268        }
1269        clients
1270    }
1271
1272    pub async fn activate_all(
1273        &self,
1274        registry: &TriggerRegistry,
1275    ) -> Result<Vec<ActivationHandle>, ConnectorError> {
1276        let mut handles = Vec::new();
1277        for (provider, connector) in &self.connectors {
1278            let bindings = registry.bindings_for(provider);
1279            if bindings.is_empty() {
1280                continue;
1281            }
1282            let connector = connector.lock().await;
1283            handles.push(connector.activate(bindings).await?);
1284        }
1285        Ok(handles)
1286    }
1287}
1288
1289impl Default for ConnectorRegistry {
1290    fn default() -> Self {
1291        Self::with_defaults()
1292    }
1293}
1294
1295fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
1296    // The provider catalog on main registers `github` with
1297    // ProviderRuntimeMetadata::Builtin { connector: "webhook", ... } so that
1298    // before a native connector existed the catalog auto-wired a
1299    // GenericWebhookConnector. Now that #170 lands a first-class
1300    // GitHubConnector (inbound HMAC + GitHub App outbound), we short-circuit
1301    // provider_id "github" here and return the native connector instead of a
1302    // webhook-backed fallback. This keeps manifests that say
1303    // `provider = "github"` pointed at the new connector without requiring
1304    // users to switch to a distinct provider_id.
1305    if provider.provider == "github" {
1306        return Box::new(GitHubConnector::new());
1307    }
1308    if provider.provider == "linear" {
1309        return Box::new(LinearConnector::new());
1310    }
1311    if provider.provider == "slack" {
1312        return Box::new(SlackConnector::new());
1313    }
1314    if provider.provider == "notion" {
1315        return Box::new(NotionConnector::new());
1316    }
1317    if provider.provider == "a2a-push" {
1318        return Box::new(A2aPushConnector::new());
1319    }
1320    match &provider.runtime {
1321        ProviderRuntimeMetadata::Builtin {
1322            connector,
1323            default_signature_variant,
1324        } => match connector.as_str() {
1325            "cron" => Box::new(CronConnector::new()),
1326            "webhook" => {
1327                let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
1328                    .expect("catalog webhook signature variant must be valid");
1329                Box::new(GenericWebhookConnector::with_profile(
1330                    WebhookProviderProfile::new(
1331                        ProviderId::from(provider.provider.clone()),
1332                        provider.schema_name.clone(),
1333                        variant,
1334                    ),
1335                ))
1336            }
1337            _ => Box::new(PlaceholderConnector::from_metadata(provider)),
1338        },
1339        ProviderRuntimeMetadata::Placeholder => {
1340            Box::new(PlaceholderConnector::from_metadata(provider))
1341        }
1342    }
1343}
1344
1345struct PlaceholderConnector {
1346    provider_id: ProviderId,
1347    kinds: Vec<TriggerKind>,
1348    schema_name: String,
1349}
1350
1351impl PlaceholderConnector {
1352    fn from_metadata(metadata: &ProviderMetadata) -> Self {
1353        Self {
1354            provider_id: ProviderId::from(metadata.provider.clone()),
1355            kinds: metadata
1356                .kinds
1357                .iter()
1358                .cloned()
1359                .map(TriggerKind::from)
1360                .collect(),
1361            schema_name: metadata.schema_name.clone(),
1362        }
1363    }
1364}
1365
1366struct PlaceholderClient;
1367
1368#[async_trait]
1369impl ConnectorClient for PlaceholderClient {
1370    async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1371        Err(ClientError::Other(format!(
1372            "connector client method '{method}' is not implemented for this provider"
1373        )))
1374    }
1375}
1376
1377#[async_trait]
1378impl Connector for PlaceholderConnector {
1379    fn provider_id(&self) -> &ProviderId {
1380        &self.provider_id
1381    }
1382
1383    fn kinds(&self) -> &[TriggerKind] {
1384        &self.kinds
1385    }
1386
1387    async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1388        Ok(())
1389    }
1390
1391    async fn activate(
1392        &self,
1393        bindings: &[TriggerBinding],
1394    ) -> Result<ActivationHandle, ConnectorError> {
1395        Ok(ActivationHandle::new(
1396            self.provider_id.clone(),
1397            bindings.len(),
1398        ))
1399    }
1400
1401    async fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
1402        Err(ConnectorError::Unsupported(format!(
1403            "provider '{}' is cataloged but does not have a concrete inbound connector yet",
1404            self.provider_id.as_str()
1405        )))
1406    }
1407
1408    fn payload_schema(&self) -> ProviderPayloadSchema {
1409        ProviderPayloadSchema::named(self.schema_name.clone())
1410    }
1411
1412    fn client(&self) -> Arc<dyn ConnectorClient> {
1413        Arc::new(PlaceholderClient)
1414    }
1415}
1416
1417pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
1418    ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
1419        *slot.borrow_mut() = clients
1420            .into_iter()
1421            .map(|(provider, client)| (provider.as_str().to_string(), client))
1422            .collect();
1423    });
1424}
1425
1426pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
1427    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
1428}
1429
1430pub fn clear_active_connector_clients() {
1431    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
1432}
1433
1434#[cfg(test)]
1435mod tests {
1436    use super::*;
1437
1438    use std::sync::atomic::{AtomicUsize, Ordering};
1439
1440    use async_trait::async_trait;
1441    use serde_json::json;
1442
1443    struct NoopClient;
1444
1445    #[async_trait]
1446    impl ConnectorClient for NoopClient {
1447        async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1448            Ok(json!({ "method": method }))
1449        }
1450    }
1451
1452    struct FakeConnector {
1453        provider_id: ProviderId,
1454        kinds: Vec<TriggerKind>,
1455        activate_calls: Arc<AtomicUsize>,
1456    }
1457
1458    impl FakeConnector {
1459        fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
1460            Self {
1461                provider_id: ProviderId::from(provider_id),
1462                kinds: vec![TriggerKind::from("webhook")],
1463                activate_calls,
1464            }
1465        }
1466    }
1467
1468    #[async_trait]
1469    impl Connector for FakeConnector {
1470        fn provider_id(&self) -> &ProviderId {
1471            &self.provider_id
1472        }
1473
1474        fn kinds(&self) -> &[TriggerKind] {
1475            &self.kinds
1476        }
1477
1478        async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1479            Ok(())
1480        }
1481
1482        async fn activate(
1483            &self,
1484            bindings: &[TriggerBinding],
1485        ) -> Result<ActivationHandle, ConnectorError> {
1486            self.activate_calls.fetch_add(1, Ordering::SeqCst);
1487            Ok(ActivationHandle::new(
1488                self.provider_id.clone(),
1489                bindings.len(),
1490            ))
1491        }
1492
1493        async fn normalize_inbound(
1494            &self,
1495            _raw: RawInbound,
1496        ) -> Result<TriggerEvent, ConnectorError> {
1497            Err(ConnectorError::Unsupported(
1498                "not needed for registry tests".to_string(),
1499            ))
1500        }
1501
1502        fn payload_schema(&self) -> ProviderPayloadSchema {
1503            ProviderPayloadSchema::named("FakePayload")
1504        }
1505
1506        fn client(&self) -> Arc<dyn ConnectorClient> {
1507            Arc::new(NoopClient)
1508        }
1509    }
1510
1511    #[tokio::test]
1512    async fn connector_registry_rejects_duplicate_providers() {
1513        let activate_calls = Arc::new(AtomicUsize::new(0));
1514        let mut registry = ConnectorRegistry::empty();
1515        registry
1516            .register(Box::new(FakeConnector::new(
1517                "github",
1518                activate_calls.clone(),
1519            )))
1520            .unwrap();
1521
1522        let error = registry
1523            .register(Box::new(FakeConnector::new("github", activate_calls)))
1524            .unwrap_err();
1525        assert!(matches!(
1526            error,
1527            ConnectorError::DuplicateProvider(provider) if provider == "github"
1528        ));
1529    }
1530
1531    #[tokio::test]
1532    async fn connector_registry_activates_only_bound_connectors() {
1533        let github_calls = Arc::new(AtomicUsize::new(0));
1534        let slack_calls = Arc::new(AtomicUsize::new(0));
1535        let mut registry = ConnectorRegistry::empty();
1536        registry
1537            .register(Box::new(FakeConnector::new("github", github_calls.clone())))
1538            .unwrap();
1539        registry
1540            .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
1541            .unwrap();
1542
1543        let mut trigger_registry = TriggerRegistry::default();
1544        trigger_registry.register(TriggerBinding::new(
1545            ProviderId::from("github"),
1546            "webhook",
1547            "github.push",
1548        ));
1549        trigger_registry.register(TriggerBinding::new(
1550            ProviderId::from("github"),
1551            "webhook",
1552            "github.installation",
1553        ));
1554
1555        let handles = registry.activate_all(&trigger_registry).await.unwrap();
1556        assert_eq!(handles.len(), 1);
1557        assert_eq!(handles[0].provider.as_str(), "github");
1558        assert_eq!(handles[0].binding_count, 2);
1559        assert_eq!(github_calls.load(Ordering::SeqCst), 1);
1560        assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
1561    }
1562
1563    #[test]
1564    fn rate_limiter_scopes_tokens_by_provider_and_key() {
1565        let factory = RateLimiterFactory::new(RateLimitConfig {
1566            capacity: 1,
1567            refill_tokens: 1,
1568            refill_interval: StdDuration::from_secs(60),
1569        });
1570
1571        assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
1572        assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
1573        assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
1574        assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
1575    }
1576
1577    #[test]
1578    fn raw_inbound_json_body_preserves_raw_bytes() {
1579        let raw = RawInbound::new(
1580            "push",
1581            BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
1582            br#"{"ok":true}"#.to_vec(),
1583        );
1584
1585        assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
1586    }
1587
1588    #[test]
1589    fn connector_registry_lists_catalog_providers() {
1590        let registry = ConnectorRegistry::default();
1591        let providers = registry.list();
1592        assert!(providers.contains(&ProviderId::from("cron")));
1593        assert!(providers.contains(&ProviderId::from("github")));
1594        assert!(providers.contains(&ProviderId::from("webhook")));
1595    }
1596
1597    #[test]
1598    fn metrics_registry_exports_orchestrator_metric_families() {
1599        let metrics = MetricsRegistry::default();
1600        metrics.record_http_request(
1601            "/triggers/github",
1602            "POST",
1603            200,
1604            StdDuration::from_millis(25),
1605            512,
1606        );
1607        metrics.record_trigger_received("github-new-issue", "github");
1608        metrics.record_trigger_deduped("github-new-issue", "inbox_duplicate");
1609        metrics.record_trigger_predicate_evaluation("github-new-issue", true, 0.002);
1610        metrics.record_trigger_dispatched("github-new-issue", "local", "succeeded");
1611        metrics.record_trigger_retry("github-new-issue", 2);
1612        metrics.record_trigger_dlq("github-new-issue", "retry_exhausted");
1613        metrics.set_trigger_inflight("github-new-issue", 0);
1614        metrics.record_event_log_append(
1615            "orchestrator.triggers.pending",
1616            StdDuration::from_millis(1),
1617            2048,
1618        );
1619        metrics.set_event_log_consumer_lag("orchestrator.triggers.pending", "orchestrator-pump", 0);
1620        metrics.set_trigger_budget_cost_today("github-new-issue", 0.002);
1621        metrics.record_trigger_budget_exhausted("github-new-issue", "daily_budget_exceeded");
1622        metrics.record_a2a_hop("agent.example", "succeeded", StdDuration::from_millis(10));
1623        metrics.set_worker_queue_depth("triage", 1);
1624        metrics.record_worker_queue_claim_age("triage", 3.0);
1625        metrics.record_llm_call("mock", "mock", "succeeded", 0.01);
1626        metrics.record_llm_cache_hit("mock");
1627
1628        let rendered = metrics.render_prometheus();
1629        for needle in [
1630            "harn_http_requests_total{endpoint=\"/triggers/github\",method=\"POST\",status=\"200\"} 1",
1631            "harn_http_request_duration_seconds_bucket{endpoint=\"/triggers/github\",le=\"0.05\"} 1",
1632            "harn_http_body_size_bytes_bucket{endpoint=\"/triggers/github\",le=\"512\"} 1",
1633            "harn_trigger_received_total{provider=\"github\",trigger_id=\"github-new-issue\"} 1",
1634            "harn_trigger_deduped_total{reason=\"inbox_duplicate\",trigger_id=\"github-new-issue\"} 1",
1635            "harn_trigger_predicate_evaluations_total{result=\"true\",trigger_id=\"github-new-issue\"} 1",
1636            "harn_trigger_predicate_cost_usd_bucket{le=\"0.01\",trigger_id=\"github-new-issue\"} 1",
1637            "harn_trigger_dispatched_total{handler_kind=\"local\",outcome=\"succeeded\",trigger_id=\"github-new-issue\"} 1",
1638            "harn_trigger_retries_total{attempt=\"2\",trigger_id=\"github-new-issue\"} 1",
1639            "harn_trigger_dlq_total{reason=\"retry_exhausted\",trigger_id=\"github-new-issue\"} 1",
1640            "harn_trigger_inflight{trigger_id=\"github-new-issue\"} 0",
1641            "harn_event_log_append_duration_seconds_bucket{le=\"0.005\",topic=\"orchestrator.triggers.pending\"} 1",
1642            "harn_event_log_topic_size_bytes{topic=\"orchestrator.triggers.pending\"} 2048",
1643            "harn_event_log_consumer_lag{consumer=\"orchestrator-pump\",topic=\"orchestrator.triggers.pending\"} 0",
1644            "harn_trigger_budget_cost_today_usd{trigger_id=\"github-new-issue\"} 0.002",
1645            "harn_trigger_budget_exhausted_total{strategy=\"daily_budget_exceeded\",trigger_id=\"github-new-issue\"} 1",
1646            "harn_a2a_hops_total{outcome=\"succeeded\",target=\"agent.example\"} 1",
1647            "harn_a2a_hop_duration_seconds_bucket{le=\"0.01\",target=\"agent.example\"} 1",
1648            "harn_worker_queue_depth{queue=\"triage\"} 1",
1649            "harn_worker_queue_claim_age_seconds_bucket{le=\"5\",queue=\"triage\"} 1",
1650            "harn_llm_calls_total{model=\"mock\",outcome=\"succeeded\",provider=\"mock\"} 1",
1651            "harn_llm_cost_usd_total{model=\"mock\",provider=\"mock\"} 0.01",
1652            "harn_llm_cache_hits_total{provider=\"mock\"} 1",
1653        ] {
1654            assert!(rendered.contains(needle), "missing {needle}\n{rendered}");
1655        }
1656    }
1657}