Skip to main content

harn_vm/connectors/
mod.rs

1//! Connector traits and shared helpers for inbound event-source providers.
2//!
3//! This lands in `harn-vm` for now because the current dependency surface
4//! (`EventLog`, `SecretProvider`, `TriggerEvent`) already lives here. If the
5//! connector ecosystem grows enough to justify extraction, the module can be
6//! split into a dedicated crate later without changing the high-level contract.
7
8use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25    registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26    ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod cron;
30pub mod github;
31pub mod hmac;
32pub mod slack;
33#[cfg(test)]
34pub(crate) mod test_util;
35pub mod webhook;
36
37pub use cron::{CatchupMode, CronConnector};
38pub use github::GitHubConnector;
39pub use hmac::{
40    verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
41    DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER, DEFAULT_SLACK_SIGNATURE_HEADER,
42    DEFAULT_SLACK_TIMESTAMP_HEADER, DEFAULT_STANDARD_WEBHOOKS_ID_HEADER,
43    DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER, DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER,
44    DEFAULT_STRIPE_SIGNATURE_HEADER, SIGNATURE_VERIFY_AUDIT_TOPIC,
45};
46pub use slack::SlackConnector;
47use webhook::WebhookProviderProfile;
48pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
49
50/// Shared owned handle to a connector instance registered with the runtime.
51pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
52
53thread_local! {
54    static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
55        RefCell::new(BTreeMap::new());
56}
57
58/// Provider implementation contract for inbound connectors.
59#[async_trait]
60pub trait Connector: Send + Sync {
61    /// Stable provider id such as `github`, `slack`, or `webhook`.
62    fn provider_id(&self) -> &ProviderId;
63
64    /// Trigger kinds this connector supports (`webhook`, `poll`, `stream`, ...).
65    fn kinds(&self) -> &[TriggerKind];
66
67    /// Called once per connector instance at orchestrator startup.
68    async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
69
70    /// Activate the bindings relevant to this connector instance.
71    async fn activate(
72        &self,
73        bindings: &[TriggerBinding],
74    ) -> Result<ActivationHandle, ConnectorError>;
75
76    /// Stop connector-owned background work and flush any connector-local state.
77    async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
78        Ok(())
79    }
80
81    /// Verify + normalize a provider-native inbound request into `TriggerEvent`.
82    fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
83
84    /// Payload schema surfaced to future trigger-type narrowing.
85    fn payload_schema(&self) -> ProviderPayloadSchema;
86
87    /// Outbound API wrapper exposed to handlers.
88    fn client(&self) -> Arc<dyn ConnectorClient>;
89}
90
91#[derive(Clone, Debug, PartialEq)]
92pub enum PostNormalizeOutcome {
93    Ready(Box<TriggerEvent>),
94    DuplicateDropped,
95}
96
97pub async fn postprocess_normalized_event(
98    inbox: &InboxIndex,
99    binding_id: &str,
100    dedupe_enabled: bool,
101    dedupe_ttl: StdDuration,
102    mut event: TriggerEvent,
103) -> Result<PostNormalizeOutcome, ConnectorError> {
104    if dedupe_enabled && !event.dedupe_claimed() {
105        if !inbox
106            .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
107            .await?
108        {
109            return Ok(PostNormalizeOutcome::DuplicateDropped);
110        }
111        event.mark_dedupe_claimed();
112    }
113
114    Ok(PostNormalizeOutcome::Ready(Box::new(event)))
115}
116
117/// Outbound provider client interface used by connector-backed stdlib modules.
118#[async_trait]
119pub trait ConnectorClient: Send + Sync {
120    async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
121}
122
123/// Minimal outbound client errors shared by connector implementations.
124#[derive(Clone, Debug, PartialEq, Eq)]
125pub enum ClientError {
126    MethodNotFound(String),
127    InvalidArgs(String),
128    RateLimited(String),
129    Transport(String),
130    Other(String),
131}
132
133impl fmt::Display for ClientError {
134    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135        match self {
136            Self::MethodNotFound(message)
137            | Self::InvalidArgs(message)
138            | Self::RateLimited(message)
139            | Self::Transport(message)
140            | Self::Other(message) => message.fmt(f),
141        }
142    }
143}
144
145impl std::error::Error for ClientError {}
146
147/// Shared connector-layer errors.
148#[derive(Debug)]
149pub enum ConnectorError {
150    DuplicateProvider(String),
151    DuplicateDelivery(String),
152    UnknownProvider(String),
153    MissingHeader(String),
154    InvalidHeader {
155        name: String,
156        detail: String,
157    },
158    InvalidSignature(String),
159    TimestampOutOfWindow {
160        timestamp: OffsetDateTime,
161        now: OffsetDateTime,
162        window: time::Duration,
163    },
164    Json(String),
165    Secret(String),
166    EventLog(String),
167    Client(ClientError),
168    Unsupported(String),
169    Activation(String),
170}
171
172impl ConnectorError {
173    pub fn invalid_signature(message: impl Into<String>) -> Self {
174        Self::InvalidSignature(message.into())
175    }
176}
177
178impl fmt::Display for ConnectorError {
179    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180        match self {
181            Self::DuplicateProvider(provider) => {
182                write!(f, "connector provider `{provider}` is already registered")
183            }
184            Self::DuplicateDelivery(message) => message.fmt(f),
185            Self::UnknownProvider(provider) => {
186                write!(f, "connector provider `{provider}` is not registered")
187            }
188            Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
189            Self::InvalidHeader { name, detail } => {
190                write!(f, "invalid header `{name}`: {detail}")
191            }
192            Self::InvalidSignature(message)
193            | Self::Json(message)
194            | Self::Secret(message)
195            | Self::EventLog(message)
196            | Self::Unsupported(message)
197            | Self::Activation(message) => message.fmt(f),
198            Self::TimestampOutOfWindow {
199                timestamp,
200                now,
201                window,
202            } => write!(
203                f,
204                "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
205            ),
206            Self::Client(error) => error.fmt(f),
207        }
208    }
209}
210
211impl std::error::Error for ConnectorError {}
212
213impl From<crate::event_log::LogError> for ConnectorError {
214    fn from(value: crate::event_log::LogError) -> Self {
215        Self::EventLog(value.to_string())
216    }
217}
218
219impl From<crate::secrets::SecretError> for ConnectorError {
220    fn from(value: crate::secrets::SecretError) -> Self {
221        Self::Secret(value.to_string())
222    }
223}
224
225impl From<serde_json::Error> for ConnectorError {
226    fn from(value: serde_json::Error) -> Self {
227        Self::Json(value.to_string())
228    }
229}
230
231impl From<ClientError> for ConnectorError {
232    fn from(value: ClientError) -> Self {
233        Self::Client(value)
234    }
235}
236
237/// Startup context shared with connector instances.
238#[derive(Clone)]
239pub struct ConnectorCtx {
240    pub event_log: Arc<AnyEventLog>,
241    pub secrets: Arc<dyn SecretProvider>,
242    pub inbox: Arc<InboxIndex>,
243    pub metrics: Arc<MetricsRegistry>,
244    pub rate_limiter: Arc<RateLimiterFactory>,
245}
246
247/// Snapshot of connector-local metrics surfaced for tests and diagnostics.
248#[derive(Clone, Debug, Default, PartialEq, Eq)]
249pub struct ConnectorMetricsSnapshot {
250    pub inbox_claims_written: u64,
251    pub inbox_duplicates_rejected: u64,
252    pub inbox_fast_path_hits: u64,
253    pub inbox_durable_hits: u64,
254    pub inbox_expired_entries: u64,
255    pub inbox_active_entries: u64,
256    pub dispatch_succeeded_total: u64,
257    pub dispatch_failed_total: u64,
258    pub retry_scheduled_total: u64,
259}
260
261/// Shared metrics surface for connector-local counters and timings.
262#[derive(Debug, Default)]
263pub struct MetricsRegistry {
264    inbox_claims_written: AtomicU64,
265    inbox_duplicates_rejected: AtomicU64,
266    inbox_fast_path_hits: AtomicU64,
267    inbox_durable_hits: AtomicU64,
268    inbox_expired_entries: AtomicU64,
269    inbox_active_entries: AtomicU64,
270    dispatch_succeeded_total: AtomicU64,
271    dispatch_failed_total: AtomicU64,
272    retry_scheduled_total: AtomicU64,
273}
274
275impl MetricsRegistry {
276    pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
277        ConnectorMetricsSnapshot {
278            inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
279            inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
280            inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
281            inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
282            inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
283            inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
284            dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
285            dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
286            retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
287        }
288    }
289
290    pub(crate) fn record_inbox_claim(&self) {
291        self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
292    }
293
294    pub(crate) fn record_inbox_duplicate_fast_path(&self) {
295        self.inbox_duplicates_rejected
296            .fetch_add(1, Ordering::Relaxed);
297        self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
298    }
299
300    pub(crate) fn record_inbox_duplicate_durable(&self) {
301        self.inbox_duplicates_rejected
302            .fetch_add(1, Ordering::Relaxed);
303        self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
304    }
305
306    pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
307        if count > 0 {
308            self.inbox_expired_entries
309                .fetch_add(count, Ordering::Relaxed);
310        }
311    }
312
313    pub(crate) fn set_inbox_active_entries(&self, count: usize) {
314        self.inbox_active_entries
315            .store(count as u64, Ordering::Relaxed);
316    }
317
318    pub fn record_dispatch_succeeded(&self) {
319        self.dispatch_succeeded_total
320            .fetch_add(1, Ordering::Relaxed);
321    }
322
323    pub fn record_dispatch_failed(&self) {
324        self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
325    }
326
327    pub fn record_retry_scheduled(&self) {
328        self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
329    }
330
331    pub fn render_prometheus(&self) -> String {
332        let snapshot = self.snapshot();
333        let counters = [
334            (
335                "dispatch_succeeded_total",
336                snapshot.dispatch_succeeded_total,
337            ),
338            ("dispatch_failed_total", snapshot.dispatch_failed_total),
339            ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
340            ("retry_scheduled_total", snapshot.retry_scheduled_total),
341        ];
342
343        let mut rendered = String::new();
344        for (name, value) in counters {
345            rendered.push_str("# TYPE ");
346            rendered.push_str(name);
347            rendered.push_str(" counter\n");
348            rendered.push_str(name);
349            rendered.push(' ');
350            rendered.push_str(&value.to_string());
351            rendered.push('\n');
352        }
353        rendered
354    }
355}
356
357/// Provider payload schema metadata exposed by a connector.
358#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
359pub struct ProviderPayloadSchema {
360    pub harn_schema_name: String,
361    #[serde(default)]
362    pub json_schema: JsonValue,
363}
364
365impl ProviderPayloadSchema {
366    pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
367        Self {
368            harn_schema_name: harn_schema_name.into(),
369            json_schema,
370        }
371    }
372
373    pub fn named(harn_schema_name: impl Into<String>) -> Self {
374        Self::new(harn_schema_name, JsonValue::Null)
375    }
376}
377
378impl Default for ProviderPayloadSchema {
379    fn default() -> Self {
380        Self::named("raw")
381    }
382}
383
384/// High-level transport kind a connector supports.
385#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
386#[serde(transparent)]
387pub struct TriggerKind(String);
388
389impl TriggerKind {
390    pub fn new(value: impl Into<String>) -> Self {
391        Self(value.into())
392    }
393
394    pub fn as_str(&self) -> &str {
395        self.0.as_str()
396    }
397}
398
399impl From<&str> for TriggerKind {
400    fn from(value: &str) -> Self {
401        Self::new(value)
402    }
403}
404
405impl From<String> for TriggerKind {
406    fn from(value: String) -> Self {
407        Self::new(value)
408    }
409}
410
411/// Future trigger manifest binding routed to a connector activation.
412#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
413pub struct TriggerBinding {
414    pub provider: ProviderId,
415    pub kind: TriggerKind,
416    pub binding_id: String,
417    #[serde(default)]
418    pub dedupe_key: Option<String>,
419    #[serde(default = "default_dedupe_retention_days")]
420    pub dedupe_retention_days: u32,
421    #[serde(default)]
422    pub config: JsonValue,
423}
424
425impl TriggerBinding {
426    pub fn new(
427        provider: ProviderId,
428        kind: impl Into<TriggerKind>,
429        binding_id: impl Into<String>,
430    ) -> Self {
431        Self {
432            provider,
433            kind: kind.into(),
434            binding_id: binding_id.into(),
435            dedupe_key: None,
436            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
437            config: JsonValue::Null,
438        }
439    }
440}
441
442fn default_dedupe_retention_days() -> u32 {
443    crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
444}
445
446/// Small in-memory trigger-binding registry used to fan bindings into connectors.
447#[derive(Clone, Debug, Default)]
448pub struct TriggerRegistry {
449    bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
450}
451
452impl TriggerRegistry {
453    pub fn register(&mut self, binding: TriggerBinding) {
454        self.bindings
455            .entry(binding.provider.clone())
456            .or_default()
457            .push(binding);
458    }
459
460    pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
461        &self.bindings
462    }
463
464    pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
465        self.bindings
466            .get(provider)
467            .map(Vec::as_slice)
468            .unwrap_or(&[])
469    }
470}
471
472/// Metadata returned from connector activation.
473#[derive(Clone, Debug, PartialEq, Eq)]
474pub struct ActivationHandle {
475    pub provider: ProviderId,
476    pub binding_count: usize,
477}
478
479impl ActivationHandle {
480    pub fn new(provider: ProviderId, binding_count: usize) -> Self {
481        Self {
482            provider,
483            binding_count,
484        }
485    }
486}
487
488/// Provider-native inbound request payload preserved as raw bytes.
489#[derive(Clone, Debug, PartialEq)]
490pub struct RawInbound {
491    pub kind: String,
492    pub headers: BTreeMap<String, String>,
493    pub query: BTreeMap<String, String>,
494    pub body: Vec<u8>,
495    pub received_at: OffsetDateTime,
496    pub occurred_at: Option<OffsetDateTime>,
497    pub tenant_id: Option<TenantId>,
498    pub metadata: JsonValue,
499}
500
501impl RawInbound {
502    pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
503        Self {
504            kind: kind.into(),
505            headers,
506            query: BTreeMap::new(),
507            body,
508            received_at: clock::now_utc(),
509            occurred_at: None,
510            tenant_id: None,
511            metadata: JsonValue::Null,
512        }
513    }
514
515    pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
516        Ok(serde_json::from_slice(&self.body)?)
517    }
518}
519
520/// Token-bucket configuration shared across connector clients.
521#[derive(Clone, Copy, Debug, PartialEq, Eq)]
522pub struct RateLimitConfig {
523    pub capacity: u32,
524    pub refill_tokens: u32,
525    pub refill_interval: StdDuration,
526}
527
528impl Default for RateLimitConfig {
529    fn default() -> Self {
530        Self {
531            capacity: 60,
532            refill_tokens: 1,
533            refill_interval: StdDuration::from_secs(1),
534        }
535    }
536}
537
538#[derive(Clone, Debug)]
539struct TokenBucket {
540    tokens: f64,
541    last_refill: ClockInstant,
542}
543
544impl TokenBucket {
545    fn full(config: RateLimitConfig) -> Self {
546        Self {
547            tokens: config.capacity as f64,
548            last_refill: clock::instant_now(),
549        }
550    }
551
552    fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
553        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
554        let rate = config.refill_tokens.max(1) as f64 / interval;
555        let elapsed = now.duration_since(self.last_refill).as_secs_f64();
556        self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
557        self.last_refill = now;
558    }
559
560    fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
561        self.refill(config, now);
562        if self.tokens >= 1.0 {
563            self.tokens -= 1.0;
564            true
565        } else {
566            false
567        }
568    }
569
570    fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
571        if self.tokens >= 1.0 {
572            return StdDuration::ZERO;
573        }
574        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
575        let rate = config.refill_tokens.max(1) as f64 / interval;
576        let missing = (1.0 - self.tokens).max(0.0);
577        StdDuration::from_secs_f64((missing / rate).max(0.001))
578    }
579}
580
581/// Shared per-provider, per-key token bucket factory for outbound connector clients.
582#[derive(Debug)]
583pub struct RateLimiterFactory {
584    config: RateLimitConfig,
585    buckets: Mutex<HashMap<(String, String), TokenBucket>>,
586}
587
588impl RateLimiterFactory {
589    pub fn new(config: RateLimitConfig) -> Self {
590        Self {
591            config,
592            buckets: Mutex::new(HashMap::new()),
593        }
594    }
595
596    pub fn config(&self) -> RateLimitConfig {
597        self.config
598    }
599
600    pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
601        ScopedRateLimiter {
602            factory: self,
603            provider: provider.clone(),
604            key: key.into(),
605        }
606    }
607
608    pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
609        let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
610        let bucket = buckets
611            .entry((provider.as_str().to_string(), key.to_string()))
612            .or_insert_with(|| TokenBucket::full(self.config));
613        bucket.try_acquire(self.config, clock::instant_now())
614    }
615
616    pub async fn acquire(&self, provider: &ProviderId, key: &str) {
617        loop {
618            let wait = {
619                let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
620                let bucket = buckets
621                    .entry((provider.as_str().to_string(), key.to_string()))
622                    .or_insert_with(|| TokenBucket::full(self.config));
623                if bucket.try_acquire(self.config, clock::instant_now()) {
624                    return;
625                }
626                bucket.wait_duration(self.config)
627            };
628            tokio::time::sleep(wait).await;
629        }
630    }
631}
632
633impl Default for RateLimiterFactory {
634    fn default() -> Self {
635        Self::new(RateLimitConfig::default())
636    }
637}
638
639/// Borrowed view onto a single provider/key rate-limit scope.
640#[derive(Clone, Debug)]
641pub struct ScopedRateLimiter<'a> {
642    factory: &'a RateLimiterFactory,
643    provider: ProviderId,
644    key: String,
645}
646
647impl<'a> ScopedRateLimiter<'a> {
648    pub fn try_acquire(&self) -> bool {
649        self.factory.try_acquire(&self.provider, &self.key)
650    }
651
652    pub async fn acquire(&self) {
653        self.factory.acquire(&self.provider, &self.key).await;
654    }
655}
656
657/// Runtime connector registry keyed by provider id.
658pub struct ConnectorRegistry {
659    connectors: BTreeMap<ProviderId, ConnectorHandle>,
660}
661
662impl ConnectorRegistry {
663    pub fn empty() -> Self {
664        Self {
665            connectors: BTreeMap::new(),
666        }
667    }
668
669    pub fn with_defaults() -> Self {
670        let mut registry = Self::empty();
671        for provider in registered_provider_metadata() {
672            registry
673                .register(default_connector_for_provider(&provider))
674                .expect("default connector registration should not fail");
675        }
676        registry
677    }
678
679    pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
680        let provider = connector.provider_id().clone();
681        if self.connectors.contains_key(&provider) {
682            return Err(ConnectorError::DuplicateProvider(provider.0));
683        }
684        self.connectors
685            .insert(provider, Arc::new(AsyncMutex::new(connector)));
686        Ok(())
687    }
688
689    pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
690        self.connectors.get(id).cloned()
691    }
692
693    pub fn list(&self) -> Vec<ProviderId> {
694        self.connectors.keys().cloned().collect()
695    }
696
697    pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
698        for connector in self.connectors.values() {
699            connector.lock().await.init(ctx.clone()).await?;
700        }
701        Ok(())
702    }
703
704    pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
705        let mut clients = BTreeMap::new();
706        for (provider, connector) in &self.connectors {
707            let client = connector.lock().await.client();
708            clients.insert(provider.clone(), client);
709        }
710        clients
711    }
712
713    pub async fn activate_all(
714        &self,
715        registry: &TriggerRegistry,
716    ) -> Result<Vec<ActivationHandle>, ConnectorError> {
717        let mut handles = Vec::new();
718        for (provider, connector) in &self.connectors {
719            let bindings = registry.bindings_for(provider);
720            if bindings.is_empty() {
721                continue;
722            }
723            let connector = connector.lock().await;
724            handles.push(connector.activate(bindings).await?);
725        }
726        Ok(handles)
727    }
728}
729
730impl Default for ConnectorRegistry {
731    fn default() -> Self {
732        Self::with_defaults()
733    }
734}
735
736fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
737    // The provider catalog on main registers `github` with
738    // ProviderRuntimeMetadata::Builtin { connector: "webhook", ... } so that
739    // before a native connector existed the catalog auto-wired a
740    // GenericWebhookConnector. Now that #170 lands a first-class
741    // GitHubConnector (inbound HMAC + GitHub App outbound), we short-circuit
742    // provider_id "github" here and return the native connector instead of a
743    // webhook-backed fallback. This keeps manifests that say
744    // `provider = "github"` pointed at the new connector without requiring
745    // users to switch to a distinct provider_id.
746    if provider.provider == "github" {
747        return Box::new(GitHubConnector::new());
748    }
749    if provider.provider == "slack" {
750        return Box::new(SlackConnector::new());
751    }
752    match &provider.runtime {
753        ProviderRuntimeMetadata::Builtin {
754            connector,
755            default_signature_variant,
756        } => match connector.as_str() {
757            "cron" => Box::new(CronConnector::new()),
758            "webhook" => {
759                let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
760                    .expect("catalog webhook signature variant must be valid");
761                Box::new(GenericWebhookConnector::with_profile(
762                    WebhookProviderProfile::new(
763                        ProviderId::from(provider.provider.clone()),
764                        provider.schema_name.clone(),
765                        variant,
766                    ),
767                ))
768            }
769            _ => Box::new(PlaceholderConnector::from_metadata(provider)),
770        },
771        ProviderRuntimeMetadata::Placeholder => {
772            Box::new(PlaceholderConnector::from_metadata(provider))
773        }
774    }
775}
776
777struct PlaceholderConnector {
778    provider_id: ProviderId,
779    kinds: Vec<TriggerKind>,
780    schema_name: String,
781}
782
783impl PlaceholderConnector {
784    fn from_metadata(metadata: &ProviderMetadata) -> Self {
785        Self {
786            provider_id: ProviderId::from(metadata.provider.clone()),
787            kinds: metadata
788                .kinds
789                .iter()
790                .cloned()
791                .map(TriggerKind::from)
792                .collect(),
793            schema_name: metadata.schema_name.clone(),
794        }
795    }
796}
797
798struct PlaceholderClient;
799
800#[async_trait]
801impl ConnectorClient for PlaceholderClient {
802    async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
803        Err(ClientError::Other(format!(
804            "connector client method '{method}' is not implemented for this provider"
805        )))
806    }
807}
808
809#[async_trait]
810impl Connector for PlaceholderConnector {
811    fn provider_id(&self) -> &ProviderId {
812        &self.provider_id
813    }
814
815    fn kinds(&self) -> &[TriggerKind] {
816        &self.kinds
817    }
818
819    async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
820        Ok(())
821    }
822
823    async fn activate(
824        &self,
825        bindings: &[TriggerBinding],
826    ) -> Result<ActivationHandle, ConnectorError> {
827        Ok(ActivationHandle::new(
828            self.provider_id.clone(),
829            bindings.len(),
830        ))
831    }
832
833    fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
834        Err(ConnectorError::Unsupported(format!(
835            "provider '{}' is cataloged but does not have a concrete inbound connector yet",
836            self.provider_id.as_str()
837        )))
838    }
839
840    fn payload_schema(&self) -> ProviderPayloadSchema {
841        ProviderPayloadSchema::named(self.schema_name.clone())
842    }
843
844    fn client(&self) -> Arc<dyn ConnectorClient> {
845        Arc::new(PlaceholderClient)
846    }
847}
848
849pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
850    ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
851        *slot.borrow_mut() = clients
852            .into_iter()
853            .map(|(provider, client)| (provider.as_str().to_string(), client))
854            .collect();
855    });
856}
857
858pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
859    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
860}
861
862pub fn clear_active_connector_clients() {
863    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
864}
865
866#[cfg(test)]
867mod tests {
868    use super::*;
869
870    use std::sync::atomic::{AtomicUsize, Ordering};
871
872    use async_trait::async_trait;
873    use serde_json::json;
874
875    struct NoopClient;
876
877    #[async_trait]
878    impl ConnectorClient for NoopClient {
879        async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
880            Ok(json!({ "method": method }))
881        }
882    }
883
884    struct FakeConnector {
885        provider_id: ProviderId,
886        kinds: Vec<TriggerKind>,
887        activate_calls: Arc<AtomicUsize>,
888    }
889
890    impl FakeConnector {
891        fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
892            Self {
893                provider_id: ProviderId::from(provider_id),
894                kinds: vec![TriggerKind::from("webhook")],
895                activate_calls,
896            }
897        }
898    }
899
900    #[async_trait]
901    impl Connector for FakeConnector {
902        fn provider_id(&self) -> &ProviderId {
903            &self.provider_id
904        }
905
906        fn kinds(&self) -> &[TriggerKind] {
907            &self.kinds
908        }
909
910        async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
911            Ok(())
912        }
913
914        async fn activate(
915            &self,
916            bindings: &[TriggerBinding],
917        ) -> Result<ActivationHandle, ConnectorError> {
918            self.activate_calls.fetch_add(1, Ordering::SeqCst);
919            Ok(ActivationHandle::new(
920                self.provider_id.clone(),
921                bindings.len(),
922            ))
923        }
924
925        fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
926            Err(ConnectorError::Unsupported(
927                "not needed for registry tests".to_string(),
928            ))
929        }
930
931        fn payload_schema(&self) -> ProviderPayloadSchema {
932            ProviderPayloadSchema::named("FakePayload")
933        }
934
935        fn client(&self) -> Arc<dyn ConnectorClient> {
936            Arc::new(NoopClient)
937        }
938    }
939
940    #[tokio::test]
941    async fn connector_registry_rejects_duplicate_providers() {
942        let activate_calls = Arc::new(AtomicUsize::new(0));
943        let mut registry = ConnectorRegistry::empty();
944        registry
945            .register(Box::new(FakeConnector::new(
946                "github",
947                activate_calls.clone(),
948            )))
949            .unwrap();
950
951        let error = registry
952            .register(Box::new(FakeConnector::new("github", activate_calls)))
953            .unwrap_err();
954        assert!(matches!(
955            error,
956            ConnectorError::DuplicateProvider(provider) if provider == "github"
957        ));
958    }
959
960    #[tokio::test]
961    async fn connector_registry_activates_only_bound_connectors() {
962        let github_calls = Arc::new(AtomicUsize::new(0));
963        let slack_calls = Arc::new(AtomicUsize::new(0));
964        let mut registry = ConnectorRegistry::empty();
965        registry
966            .register(Box::new(FakeConnector::new("github", github_calls.clone())))
967            .unwrap();
968        registry
969            .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
970            .unwrap();
971
972        let mut trigger_registry = TriggerRegistry::default();
973        trigger_registry.register(TriggerBinding::new(
974            ProviderId::from("github"),
975            "webhook",
976            "github.push",
977        ));
978        trigger_registry.register(TriggerBinding::new(
979            ProviderId::from("github"),
980            "webhook",
981            "github.installation",
982        ));
983
984        let handles = registry.activate_all(&trigger_registry).await.unwrap();
985        assert_eq!(handles.len(), 1);
986        assert_eq!(handles[0].provider.as_str(), "github");
987        assert_eq!(handles[0].binding_count, 2);
988        assert_eq!(github_calls.load(Ordering::SeqCst), 1);
989        assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
990    }
991
992    #[test]
993    fn rate_limiter_scopes_tokens_by_provider_and_key() {
994        let factory = RateLimiterFactory::new(RateLimitConfig {
995            capacity: 1,
996            refill_tokens: 1,
997            refill_interval: StdDuration::from_secs(60),
998        });
999
1000        assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
1001        assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
1002        assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
1003        assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
1004    }
1005
1006    #[test]
1007    fn raw_inbound_json_body_preserves_raw_bytes() {
1008        let raw = RawInbound::new(
1009            "push",
1010            BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
1011            br#"{"ok":true}"#.to_vec(),
1012        );
1013
1014        assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
1015    }
1016
1017    #[test]
1018    fn connector_registry_lists_catalog_providers() {
1019        let registry = ConnectorRegistry::default();
1020        let providers = registry.list();
1021        assert!(providers.contains(&ProviderId::from("cron")));
1022        assert!(providers.contains(&ProviderId::from("github")));
1023        assert!(providers.contains(&ProviderId::from("webhook")));
1024    }
1025}