Skip to main content

harn_vm/connectors/
mod.rs

1//! Connector traits and shared helpers for inbound event-source providers.
2//!
3//! This lands in `harn-vm` for now because the current dependency surface
4//! (`EventLog`, `SecretProvider`, `TriggerEvent`) already lives here. If the
5//! connector ecosystem grows enough to justify extraction, the module can be
6//! split into a dedicated crate later without changing the high-level contract.
7
8use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25    registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26    ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod cron;
30pub mod github;
31pub mod hmac;
32pub mod linear;
33pub mod notion;
34pub mod slack;
35#[cfg(test)]
36pub(crate) mod test_util;
37pub mod webhook;
38
39pub use cron::{CatchupMode, CronConnector};
40pub use github::GitHubConnector;
41pub use hmac::{
42    verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
43    DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
44    DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
45    DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
46    DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
47    DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
48    SIGNATURE_VERIFY_AUDIT_TOPIC,
49};
50pub use linear::LinearConnector;
51pub use notion::{
52    load_pending_webhook_handshakes, NotionConnector, PersistedNotionWebhookHandshake,
53};
54pub use slack::SlackConnector;
55use webhook::WebhookProviderProfile;
56pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
57
58/// Shared owned handle to a connector instance registered with the runtime.
59pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
60
61thread_local! {
62    static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
63        RefCell::new(BTreeMap::new());
64}
65
66/// Provider implementation contract for inbound connectors.
67#[async_trait]
68pub trait Connector: Send + Sync {
69    /// Stable provider id such as `github`, `slack`, or `webhook`.
70    fn provider_id(&self) -> &ProviderId;
71
72    /// Trigger kinds this connector supports (`webhook`, `poll`, `stream`, ...).
73    fn kinds(&self) -> &[TriggerKind];
74
75    /// Called once per connector instance at orchestrator startup.
76    async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
77
78    /// Activate the bindings relevant to this connector instance.
79    async fn activate(
80        &self,
81        bindings: &[TriggerBinding],
82    ) -> Result<ActivationHandle, ConnectorError>;
83
84    /// Stop connector-owned background work and flush any connector-local state.
85    async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
86        Ok(())
87    }
88
89    /// Verify + normalize a provider-native inbound request into `TriggerEvent`.
90    fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
91
92    /// Payload schema surfaced to future trigger-type narrowing.
93    fn payload_schema(&self) -> ProviderPayloadSchema;
94
95    /// Outbound API wrapper exposed to handlers.
96    fn client(&self) -> Arc<dyn ConnectorClient>;
97}
98
99#[derive(Clone, Debug, PartialEq)]
100pub enum PostNormalizeOutcome {
101    Ready(Box<TriggerEvent>),
102    DuplicateDropped,
103}
104
105pub async fn postprocess_normalized_event(
106    inbox: &InboxIndex,
107    binding_id: &str,
108    dedupe_enabled: bool,
109    dedupe_ttl: StdDuration,
110    mut event: TriggerEvent,
111) -> Result<PostNormalizeOutcome, ConnectorError> {
112    if dedupe_enabled && !event.dedupe_claimed() {
113        if !inbox
114            .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
115            .await?
116        {
117            return Ok(PostNormalizeOutcome::DuplicateDropped);
118        }
119        event.mark_dedupe_claimed();
120    }
121
122    Ok(PostNormalizeOutcome::Ready(Box::new(event)))
123}
124
125/// Outbound provider client interface used by connector-backed stdlib modules.
126#[async_trait]
127pub trait ConnectorClient: Send + Sync {
128    async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
129}
130
131/// Minimal outbound client errors shared by connector implementations.
132#[derive(Clone, Debug, PartialEq, Eq)]
133pub enum ClientError {
134    MethodNotFound(String),
135    InvalidArgs(String),
136    RateLimited(String),
137    Transport(String),
138    Other(String),
139}
140
141impl fmt::Display for ClientError {
142    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143        match self {
144            Self::MethodNotFound(message)
145            | Self::InvalidArgs(message)
146            | Self::RateLimited(message)
147            | Self::Transport(message)
148            | Self::Other(message) => message.fmt(f),
149        }
150    }
151}
152
153impl std::error::Error for ClientError {}
154
155/// Shared connector-layer errors.
156#[derive(Debug)]
157pub enum ConnectorError {
158    DuplicateProvider(String),
159    DuplicateDelivery(String),
160    UnknownProvider(String),
161    MissingHeader(String),
162    InvalidHeader {
163        name: String,
164        detail: String,
165    },
166    InvalidSignature(String),
167    TimestampOutOfWindow {
168        timestamp: OffsetDateTime,
169        now: OffsetDateTime,
170        window: time::Duration,
171    },
172    Json(String),
173    Secret(String),
174    EventLog(String),
175    Client(ClientError),
176    Unsupported(String),
177    Activation(String),
178}
179
180impl ConnectorError {
181    pub fn invalid_signature(message: impl Into<String>) -> Self {
182        Self::InvalidSignature(message.into())
183    }
184}
185
186impl fmt::Display for ConnectorError {
187    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188        match self {
189            Self::DuplicateProvider(provider) => {
190                write!(f, "connector provider `{provider}` is already registered")
191            }
192            Self::DuplicateDelivery(message) => message.fmt(f),
193            Self::UnknownProvider(provider) => {
194                write!(f, "connector provider `{provider}` is not registered")
195            }
196            Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
197            Self::InvalidHeader { name, detail } => {
198                write!(f, "invalid header `{name}`: {detail}")
199            }
200            Self::InvalidSignature(message)
201            | Self::Json(message)
202            | Self::Secret(message)
203            | Self::EventLog(message)
204            | Self::Unsupported(message)
205            | Self::Activation(message) => message.fmt(f),
206            Self::TimestampOutOfWindow {
207                timestamp,
208                now,
209                window,
210            } => write!(
211                f,
212                "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
213            ),
214            Self::Client(error) => error.fmt(f),
215        }
216    }
217}
218
219impl std::error::Error for ConnectorError {}
220
221impl From<crate::event_log::LogError> for ConnectorError {
222    fn from(value: crate::event_log::LogError) -> Self {
223        Self::EventLog(value.to_string())
224    }
225}
226
227impl From<crate::secrets::SecretError> for ConnectorError {
228    fn from(value: crate::secrets::SecretError) -> Self {
229        Self::Secret(value.to_string())
230    }
231}
232
233impl From<serde_json::Error> for ConnectorError {
234    fn from(value: serde_json::Error) -> Self {
235        Self::Json(value.to_string())
236    }
237}
238
239impl From<ClientError> for ConnectorError {
240    fn from(value: ClientError) -> Self {
241        Self::Client(value)
242    }
243}
244
245/// Startup context shared with connector instances.
246#[derive(Clone)]
247pub struct ConnectorCtx {
248    pub event_log: Arc<AnyEventLog>,
249    pub secrets: Arc<dyn SecretProvider>,
250    pub inbox: Arc<InboxIndex>,
251    pub metrics: Arc<MetricsRegistry>,
252    pub rate_limiter: Arc<RateLimiterFactory>,
253}
254
255/// Snapshot of connector-local metrics surfaced for tests and diagnostics.
256#[derive(Clone, Debug, Default, PartialEq, Eq)]
257pub struct ConnectorMetricsSnapshot {
258    pub inbox_claims_written: u64,
259    pub inbox_duplicates_rejected: u64,
260    pub inbox_fast_path_hits: u64,
261    pub inbox_durable_hits: u64,
262    pub inbox_expired_entries: u64,
263    pub inbox_active_entries: u64,
264    pub linear_timestamp_rejections_total: u64,
265    pub dispatch_succeeded_total: u64,
266    pub dispatch_failed_total: u64,
267    pub retry_scheduled_total: u64,
268    pub slack_delivery_success_total: u64,
269    pub slack_delivery_failure_total: u64,
270}
271
272/// Shared metrics surface for connector-local counters and timings.
273#[derive(Debug, Default)]
274pub struct MetricsRegistry {
275    inbox_claims_written: AtomicU64,
276    inbox_duplicates_rejected: AtomicU64,
277    inbox_fast_path_hits: AtomicU64,
278    inbox_durable_hits: AtomicU64,
279    inbox_expired_entries: AtomicU64,
280    inbox_active_entries: AtomicU64,
281    linear_timestamp_rejections_total: AtomicU64,
282    dispatch_succeeded_total: AtomicU64,
283    dispatch_failed_total: AtomicU64,
284    retry_scheduled_total: AtomicU64,
285    slack_delivery_success_total: AtomicU64,
286    slack_delivery_failure_total: AtomicU64,
287}
288
289impl MetricsRegistry {
290    pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
291        ConnectorMetricsSnapshot {
292            inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
293            inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
294            inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
295            inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
296            inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
297            inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
298            linear_timestamp_rejections_total: self
299                .linear_timestamp_rejections_total
300                .load(Ordering::Relaxed),
301            dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
302            dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
303            retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
304            slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
305            slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
306        }
307    }
308
309    pub(crate) fn record_inbox_claim(&self) {
310        self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
311    }
312
313    pub(crate) fn record_inbox_duplicate_fast_path(&self) {
314        self.inbox_duplicates_rejected
315            .fetch_add(1, Ordering::Relaxed);
316        self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
317    }
318
319    pub(crate) fn record_inbox_duplicate_durable(&self) {
320        self.inbox_duplicates_rejected
321            .fetch_add(1, Ordering::Relaxed);
322        self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
323    }
324
325    pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
326        if count > 0 {
327            self.inbox_expired_entries
328                .fetch_add(count, Ordering::Relaxed);
329        }
330    }
331
332    pub(crate) fn set_inbox_active_entries(&self, count: usize) {
333        self.inbox_active_entries
334            .store(count as u64, Ordering::Relaxed);
335    }
336
337    pub fn record_linear_timestamp_rejection(&self) {
338        self.linear_timestamp_rejections_total
339            .fetch_add(1, Ordering::Relaxed);
340    }
341
342    pub fn record_dispatch_succeeded(&self) {
343        self.dispatch_succeeded_total
344            .fetch_add(1, Ordering::Relaxed);
345    }
346
347    pub fn record_dispatch_failed(&self) {
348        self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
349    }
350
351    pub fn record_retry_scheduled(&self) {
352        self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
353    }
354
355    pub fn record_slack_delivery_success(&self) {
356        self.slack_delivery_success_total
357            .fetch_add(1, Ordering::Relaxed);
358    }
359
360    pub fn record_slack_delivery_failure(&self) {
361        self.slack_delivery_failure_total
362            .fetch_add(1, Ordering::Relaxed);
363    }
364
365    pub fn render_prometheus(&self) -> String {
366        let snapshot = self.snapshot();
367        let counters = [
368            (
369                "connector_linear_timestamp_rejections_total",
370                snapshot.linear_timestamp_rejections_total,
371            ),
372            (
373                "dispatch_succeeded_total",
374                snapshot.dispatch_succeeded_total,
375            ),
376            ("dispatch_failed_total", snapshot.dispatch_failed_total),
377            ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
378            ("retry_scheduled_total", snapshot.retry_scheduled_total),
379            (
380                "slack_events_delivery_success_total",
381                snapshot.slack_delivery_success_total,
382            ),
383            (
384                "slack_events_delivery_failure_total",
385                snapshot.slack_delivery_failure_total,
386            ),
387        ];
388
389        let mut rendered = String::new();
390        for (name, value) in counters {
391            rendered.push_str("# TYPE ");
392            rendered.push_str(name);
393            rendered.push_str(" counter\n");
394            rendered.push_str(name);
395            rendered.push(' ');
396            rendered.push_str(&value.to_string());
397            rendered.push('\n');
398        }
399        rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
400        rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
401        rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
402        rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
403        rendered
404    }
405}
406
407/// Provider payload schema metadata exposed by a connector.
408#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
409pub struct ProviderPayloadSchema {
410    pub harn_schema_name: String,
411    #[serde(default)]
412    pub json_schema: JsonValue,
413}
414
415impl ProviderPayloadSchema {
416    pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
417        Self {
418            harn_schema_name: harn_schema_name.into(),
419            json_schema,
420        }
421    }
422
423    pub fn named(harn_schema_name: impl Into<String>) -> Self {
424        Self::new(harn_schema_name, JsonValue::Null)
425    }
426}
427
428impl Default for ProviderPayloadSchema {
429    fn default() -> Self {
430        Self::named("raw")
431    }
432}
433
434/// High-level transport kind a connector supports.
435#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
436#[serde(transparent)]
437pub struct TriggerKind(String);
438
439impl TriggerKind {
440    pub fn new(value: impl Into<String>) -> Self {
441        Self(value.into())
442    }
443
444    pub fn as_str(&self) -> &str {
445        self.0.as_str()
446    }
447}
448
449impl From<&str> for TriggerKind {
450    fn from(value: &str) -> Self {
451        Self::new(value)
452    }
453}
454
455impl From<String> for TriggerKind {
456    fn from(value: String) -> Self {
457        Self::new(value)
458    }
459}
460
461/// Future trigger manifest binding routed to a connector activation.
462#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
463pub struct TriggerBinding {
464    pub provider: ProviderId,
465    pub kind: TriggerKind,
466    pub binding_id: String,
467    #[serde(default)]
468    pub dedupe_key: Option<String>,
469    #[serde(default = "default_dedupe_retention_days")]
470    pub dedupe_retention_days: u32,
471    #[serde(default)]
472    pub config: JsonValue,
473}
474
475impl TriggerBinding {
476    pub fn new(
477        provider: ProviderId,
478        kind: impl Into<TriggerKind>,
479        binding_id: impl Into<String>,
480    ) -> Self {
481        Self {
482            provider,
483            kind: kind.into(),
484            binding_id: binding_id.into(),
485            dedupe_key: None,
486            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
487            config: JsonValue::Null,
488        }
489    }
490}
491
492fn default_dedupe_retention_days() -> u32 {
493    crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
494}
495
496/// Small in-memory trigger-binding registry used to fan bindings into connectors.
497#[derive(Clone, Debug, Default)]
498pub struct TriggerRegistry {
499    bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
500}
501
502impl TriggerRegistry {
503    pub fn register(&mut self, binding: TriggerBinding) {
504        self.bindings
505            .entry(binding.provider.clone())
506            .or_default()
507            .push(binding);
508    }
509
510    pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
511        &self.bindings
512    }
513
514    pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
515        self.bindings
516            .get(provider)
517            .map(Vec::as_slice)
518            .unwrap_or(&[])
519    }
520}
521
522/// Metadata returned from connector activation.
523#[derive(Clone, Debug, PartialEq, Eq)]
524pub struct ActivationHandle {
525    pub provider: ProviderId,
526    pub binding_count: usize,
527}
528
529impl ActivationHandle {
530    pub fn new(provider: ProviderId, binding_count: usize) -> Self {
531        Self {
532            provider,
533            binding_count,
534        }
535    }
536}
537
538/// Provider-native inbound request payload preserved as raw bytes.
539#[derive(Clone, Debug, PartialEq)]
540pub struct RawInbound {
541    pub kind: String,
542    pub headers: BTreeMap<String, String>,
543    pub query: BTreeMap<String, String>,
544    pub body: Vec<u8>,
545    pub received_at: OffsetDateTime,
546    pub occurred_at: Option<OffsetDateTime>,
547    pub tenant_id: Option<TenantId>,
548    pub metadata: JsonValue,
549}
550
551impl RawInbound {
552    pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
553        Self {
554            kind: kind.into(),
555            headers,
556            query: BTreeMap::new(),
557            body,
558            received_at: clock::now_utc(),
559            occurred_at: None,
560            tenant_id: None,
561            metadata: JsonValue::Null,
562        }
563    }
564
565    pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
566        Ok(serde_json::from_slice(&self.body)?)
567    }
568}
569
570/// Token-bucket configuration shared across connector clients.
571#[derive(Clone, Copy, Debug, PartialEq, Eq)]
572pub struct RateLimitConfig {
573    pub capacity: u32,
574    pub refill_tokens: u32,
575    pub refill_interval: StdDuration,
576}
577
578impl Default for RateLimitConfig {
579    fn default() -> Self {
580        Self {
581            capacity: 60,
582            refill_tokens: 1,
583            refill_interval: StdDuration::from_secs(1),
584        }
585    }
586}
587
588#[derive(Clone, Debug)]
589struct TokenBucket {
590    tokens: f64,
591    last_refill: ClockInstant,
592}
593
594impl TokenBucket {
595    fn full(config: RateLimitConfig) -> Self {
596        Self {
597            tokens: config.capacity as f64,
598            last_refill: clock::instant_now(),
599        }
600    }
601
602    fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
603        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
604        let rate = config.refill_tokens.max(1) as f64 / interval;
605        let elapsed = now.duration_since(self.last_refill).as_secs_f64();
606        self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
607        self.last_refill = now;
608    }
609
610    fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
611        self.refill(config, now);
612        if self.tokens >= 1.0 {
613            self.tokens -= 1.0;
614            true
615        } else {
616            false
617        }
618    }
619
620    fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
621        if self.tokens >= 1.0 {
622            return StdDuration::ZERO;
623        }
624        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
625        let rate = config.refill_tokens.max(1) as f64 / interval;
626        let missing = (1.0 - self.tokens).max(0.0);
627        StdDuration::from_secs_f64((missing / rate).max(0.001))
628    }
629}
630
631/// Shared per-provider, per-key token bucket factory for outbound connector clients.
632#[derive(Debug)]
633pub struct RateLimiterFactory {
634    config: RateLimitConfig,
635    buckets: Mutex<HashMap<(String, String), TokenBucket>>,
636}
637
638impl RateLimiterFactory {
639    pub fn new(config: RateLimitConfig) -> Self {
640        Self {
641            config,
642            buckets: Mutex::new(HashMap::new()),
643        }
644    }
645
646    pub fn config(&self) -> RateLimitConfig {
647        self.config
648    }
649
650    pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
651        ScopedRateLimiter {
652            factory: self,
653            provider: provider.clone(),
654            key: key.into(),
655        }
656    }
657
658    pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
659        let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
660        let bucket = buckets
661            .entry((provider.as_str().to_string(), key.to_string()))
662            .or_insert_with(|| TokenBucket::full(self.config));
663        bucket.try_acquire(self.config, clock::instant_now())
664    }
665
666    pub async fn acquire(&self, provider: &ProviderId, key: &str) {
667        loop {
668            let wait = {
669                let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
670                let bucket = buckets
671                    .entry((provider.as_str().to_string(), key.to_string()))
672                    .or_insert_with(|| TokenBucket::full(self.config));
673                if bucket.try_acquire(self.config, clock::instant_now()) {
674                    return;
675                }
676                bucket.wait_duration(self.config)
677            };
678            tokio::time::sleep(wait).await;
679        }
680    }
681}
682
683impl Default for RateLimiterFactory {
684    fn default() -> Self {
685        Self::new(RateLimitConfig::default())
686    }
687}
688
689/// Borrowed view onto a single provider/key rate-limit scope.
690#[derive(Clone, Debug)]
691pub struct ScopedRateLimiter<'a> {
692    factory: &'a RateLimiterFactory,
693    provider: ProviderId,
694    key: String,
695}
696
697impl<'a> ScopedRateLimiter<'a> {
698    pub fn try_acquire(&self) -> bool {
699        self.factory.try_acquire(&self.provider, &self.key)
700    }
701
702    pub async fn acquire(&self) {
703        self.factory.acquire(&self.provider, &self.key).await;
704    }
705}
706
707/// Runtime connector registry keyed by provider id.
708pub struct ConnectorRegistry {
709    connectors: BTreeMap<ProviderId, ConnectorHandle>,
710}
711
712impl ConnectorRegistry {
713    pub fn empty() -> Self {
714        Self {
715            connectors: BTreeMap::new(),
716        }
717    }
718
719    pub fn with_defaults() -> Self {
720        let mut registry = Self::empty();
721        for provider in registered_provider_metadata() {
722            registry
723                .register(default_connector_for_provider(&provider))
724                .expect("default connector registration should not fail");
725        }
726        registry
727    }
728
729    pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
730        let provider = connector.provider_id().clone();
731        if self.connectors.contains_key(&provider) {
732            return Err(ConnectorError::DuplicateProvider(provider.0));
733        }
734        self.connectors
735            .insert(provider, Arc::new(AsyncMutex::new(connector)));
736        Ok(())
737    }
738
739    pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
740        self.connectors.get(id).cloned()
741    }
742
743    pub fn list(&self) -> Vec<ProviderId> {
744        self.connectors.keys().cloned().collect()
745    }
746
747    pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
748        for connector in self.connectors.values() {
749            connector.lock().await.init(ctx.clone()).await?;
750        }
751        Ok(())
752    }
753
754    pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
755        let mut clients = BTreeMap::new();
756        for (provider, connector) in &self.connectors {
757            let client = connector.lock().await.client();
758            clients.insert(provider.clone(), client);
759        }
760        clients
761    }
762
763    pub async fn activate_all(
764        &self,
765        registry: &TriggerRegistry,
766    ) -> Result<Vec<ActivationHandle>, ConnectorError> {
767        let mut handles = Vec::new();
768        for (provider, connector) in &self.connectors {
769            let bindings = registry.bindings_for(provider);
770            if bindings.is_empty() {
771                continue;
772            }
773            let connector = connector.lock().await;
774            handles.push(connector.activate(bindings).await?);
775        }
776        Ok(handles)
777    }
778}
779
780impl Default for ConnectorRegistry {
781    fn default() -> Self {
782        Self::with_defaults()
783    }
784}
785
786fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
787    // The provider catalog on main registers `github` with
788    // ProviderRuntimeMetadata::Builtin { connector: "webhook", ... } so that
789    // before a native connector existed the catalog auto-wired a
790    // GenericWebhookConnector. Now that #170 lands a first-class
791    // GitHubConnector (inbound HMAC + GitHub App outbound), we short-circuit
792    // provider_id "github" here and return the native connector instead of a
793    // webhook-backed fallback. This keeps manifests that say
794    // `provider = "github"` pointed at the new connector without requiring
795    // users to switch to a distinct provider_id.
796    if provider.provider == "github" {
797        return Box::new(GitHubConnector::new());
798    }
799    if provider.provider == "linear" {
800        return Box::new(LinearConnector::new());
801    }
802    if provider.provider == "slack" {
803        return Box::new(SlackConnector::new());
804    }
805    if provider.provider == "notion" {
806        return Box::new(NotionConnector::new());
807    }
808    match &provider.runtime {
809        ProviderRuntimeMetadata::Builtin {
810            connector,
811            default_signature_variant,
812        } => match connector.as_str() {
813            "cron" => Box::new(CronConnector::new()),
814            "webhook" => {
815                let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
816                    .expect("catalog webhook signature variant must be valid");
817                Box::new(GenericWebhookConnector::with_profile(
818                    WebhookProviderProfile::new(
819                        ProviderId::from(provider.provider.clone()),
820                        provider.schema_name.clone(),
821                        variant,
822                    ),
823                ))
824            }
825            _ => Box::new(PlaceholderConnector::from_metadata(provider)),
826        },
827        ProviderRuntimeMetadata::Placeholder => {
828            Box::new(PlaceholderConnector::from_metadata(provider))
829        }
830    }
831}
832
833struct PlaceholderConnector {
834    provider_id: ProviderId,
835    kinds: Vec<TriggerKind>,
836    schema_name: String,
837}
838
839impl PlaceholderConnector {
840    fn from_metadata(metadata: &ProviderMetadata) -> Self {
841        Self {
842            provider_id: ProviderId::from(metadata.provider.clone()),
843            kinds: metadata
844                .kinds
845                .iter()
846                .cloned()
847                .map(TriggerKind::from)
848                .collect(),
849            schema_name: metadata.schema_name.clone(),
850        }
851    }
852}
853
854struct PlaceholderClient;
855
856#[async_trait]
857impl ConnectorClient for PlaceholderClient {
858    async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
859        Err(ClientError::Other(format!(
860            "connector client method '{method}' is not implemented for this provider"
861        )))
862    }
863}
864
865#[async_trait]
866impl Connector for PlaceholderConnector {
867    fn provider_id(&self) -> &ProviderId {
868        &self.provider_id
869    }
870
871    fn kinds(&self) -> &[TriggerKind] {
872        &self.kinds
873    }
874
875    async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
876        Ok(())
877    }
878
879    async fn activate(
880        &self,
881        bindings: &[TriggerBinding],
882    ) -> Result<ActivationHandle, ConnectorError> {
883        Ok(ActivationHandle::new(
884            self.provider_id.clone(),
885            bindings.len(),
886        ))
887    }
888
889    fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
890        Err(ConnectorError::Unsupported(format!(
891            "provider '{}' is cataloged but does not have a concrete inbound connector yet",
892            self.provider_id.as_str()
893        )))
894    }
895
896    fn payload_schema(&self) -> ProviderPayloadSchema {
897        ProviderPayloadSchema::named(self.schema_name.clone())
898    }
899
900    fn client(&self) -> Arc<dyn ConnectorClient> {
901        Arc::new(PlaceholderClient)
902    }
903}
904
905pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
906    ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
907        *slot.borrow_mut() = clients
908            .into_iter()
909            .map(|(provider, client)| (provider.as_str().to_string(), client))
910            .collect();
911    });
912}
913
914pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
915    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
916}
917
918pub fn clear_active_connector_clients() {
919    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
920}
921
922#[cfg(test)]
923mod tests {
924    use super::*;
925
926    use std::sync::atomic::{AtomicUsize, Ordering};
927
928    use async_trait::async_trait;
929    use serde_json::json;
930
931    struct NoopClient;
932
933    #[async_trait]
934    impl ConnectorClient for NoopClient {
935        async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
936            Ok(json!({ "method": method }))
937        }
938    }
939
940    struct FakeConnector {
941        provider_id: ProviderId,
942        kinds: Vec<TriggerKind>,
943        activate_calls: Arc<AtomicUsize>,
944    }
945
946    impl FakeConnector {
947        fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
948            Self {
949                provider_id: ProviderId::from(provider_id),
950                kinds: vec![TriggerKind::from("webhook")],
951                activate_calls,
952            }
953        }
954    }
955
956    #[async_trait]
957    impl Connector for FakeConnector {
958        fn provider_id(&self) -> &ProviderId {
959            &self.provider_id
960        }
961
962        fn kinds(&self) -> &[TriggerKind] {
963            &self.kinds
964        }
965
966        async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
967            Ok(())
968        }
969
970        async fn activate(
971            &self,
972            bindings: &[TriggerBinding],
973        ) -> Result<ActivationHandle, ConnectorError> {
974            self.activate_calls.fetch_add(1, Ordering::SeqCst);
975            Ok(ActivationHandle::new(
976                self.provider_id.clone(),
977                bindings.len(),
978            ))
979        }
980
981        fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
982            Err(ConnectorError::Unsupported(
983                "not needed for registry tests".to_string(),
984            ))
985        }
986
987        fn payload_schema(&self) -> ProviderPayloadSchema {
988            ProviderPayloadSchema::named("FakePayload")
989        }
990
991        fn client(&self) -> Arc<dyn ConnectorClient> {
992            Arc::new(NoopClient)
993        }
994    }
995
996    #[tokio::test]
997    async fn connector_registry_rejects_duplicate_providers() {
998        let activate_calls = Arc::new(AtomicUsize::new(0));
999        let mut registry = ConnectorRegistry::empty();
1000        registry
1001            .register(Box::new(FakeConnector::new(
1002                "github",
1003                activate_calls.clone(),
1004            )))
1005            .unwrap();
1006
1007        let error = registry
1008            .register(Box::new(FakeConnector::new("github", activate_calls)))
1009            .unwrap_err();
1010        assert!(matches!(
1011            error,
1012            ConnectorError::DuplicateProvider(provider) if provider == "github"
1013        ));
1014    }
1015
1016    #[tokio::test]
1017    async fn connector_registry_activates_only_bound_connectors() {
1018        let github_calls = Arc::new(AtomicUsize::new(0));
1019        let slack_calls = Arc::new(AtomicUsize::new(0));
1020        let mut registry = ConnectorRegistry::empty();
1021        registry
1022            .register(Box::new(FakeConnector::new("github", github_calls.clone())))
1023            .unwrap();
1024        registry
1025            .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
1026            .unwrap();
1027
1028        let mut trigger_registry = TriggerRegistry::default();
1029        trigger_registry.register(TriggerBinding::new(
1030            ProviderId::from("github"),
1031            "webhook",
1032            "github.push",
1033        ));
1034        trigger_registry.register(TriggerBinding::new(
1035            ProviderId::from("github"),
1036            "webhook",
1037            "github.installation",
1038        ));
1039
1040        let handles = registry.activate_all(&trigger_registry).await.unwrap();
1041        assert_eq!(handles.len(), 1);
1042        assert_eq!(handles[0].provider.as_str(), "github");
1043        assert_eq!(handles[0].binding_count, 2);
1044        assert_eq!(github_calls.load(Ordering::SeqCst), 1);
1045        assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
1046    }
1047
1048    #[test]
1049    fn rate_limiter_scopes_tokens_by_provider_and_key() {
1050        let factory = RateLimiterFactory::new(RateLimitConfig {
1051            capacity: 1,
1052            refill_tokens: 1,
1053            refill_interval: StdDuration::from_secs(60),
1054        });
1055
1056        assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
1057        assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
1058        assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
1059        assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
1060    }
1061
1062    #[test]
1063    fn raw_inbound_json_body_preserves_raw_bytes() {
1064        let raw = RawInbound::new(
1065            "push",
1066            BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
1067            br#"{"ok":true}"#.to_vec(),
1068        );
1069
1070        assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
1071    }
1072
1073    #[test]
1074    fn connector_registry_lists_catalog_providers() {
1075        let registry = ConnectorRegistry::default();
1076        let providers = registry.list();
1077        assert!(providers.contains(&ProviderId::from("cron")));
1078        assert!(providers.contains(&ProviderId::from("github")));
1079        assert!(providers.contains(&ProviderId::from("webhook")));
1080    }
1081}