Skip to main content

harn_vm/connectors/
mod.rs

1//! Connector traits and shared helpers for inbound event-source providers.
2//!
3//! This lands in `harn-vm` for now because the current dependency surface
4//! (`EventLog`, `SecretProvider`, `TriggerEvent`) already lives here. If the
5//! connector ecosystem grows enough to justify extraction, the module can be
6//! split into a dedicated crate later without changing the high-level contract.
7
8use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25    registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26    ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod cron;
30pub mod github;
31pub mod harn_module;
32pub mod hmac;
33pub mod linear;
34pub mod notion;
35pub mod slack;
36#[cfg(test)]
37pub(crate) mod test_util;
38pub mod webhook;
39
40pub use cron::{CatchupMode, CronConnector};
41pub use github::GitHubConnector;
42pub use harn_module::{
43    load_contract as load_harn_connector_contract, HarnConnector, HarnConnectorContract,
44};
45pub use hmac::{
46    verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
47    DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
48    DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
49    DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
50    DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
51    DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
52    SIGNATURE_VERIFY_AUDIT_TOPIC,
53};
54pub use linear::LinearConnector;
55pub use notion::{
56    load_pending_webhook_handshakes, NotionConnector, PersistedNotionWebhookHandshake,
57};
58pub use slack::SlackConnector;
59use webhook::WebhookProviderProfile;
60pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
61
62/// Shared owned handle to a connector instance registered with the runtime.
63pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
64
65thread_local! {
66    static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
67        RefCell::new(BTreeMap::new());
68}
69
70/// Provider implementation contract for inbound connectors.
71#[async_trait]
72pub trait Connector: Send + Sync {
73    /// Stable provider id such as `github`, `slack`, or `webhook`.
74    fn provider_id(&self) -> &ProviderId;
75
76    /// Trigger kinds this connector supports (`webhook`, `poll`, `stream`, ...).
77    fn kinds(&self) -> &[TriggerKind];
78
79    /// Called once per connector instance at orchestrator startup.
80    async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
81
82    /// Activate the bindings relevant to this connector instance.
83    async fn activate(
84        &self,
85        bindings: &[TriggerBinding],
86    ) -> Result<ActivationHandle, ConnectorError>;
87
88    /// Stop connector-owned background work and flush any connector-local state.
89    async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
90        Ok(())
91    }
92
93    /// Verify + normalize a provider-native inbound request into `TriggerEvent`.
94    async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
95
96    /// Payload schema surfaced to future trigger-type narrowing.
97    fn payload_schema(&self) -> ProviderPayloadSchema;
98
99    /// Outbound API wrapper exposed to handlers.
100    fn client(&self) -> Arc<dyn ConnectorClient>;
101}
102
103#[derive(Clone, Debug, PartialEq)]
104pub enum PostNormalizeOutcome {
105    Ready(Box<TriggerEvent>),
106    DuplicateDropped,
107}
108
109pub async fn postprocess_normalized_event(
110    inbox: &InboxIndex,
111    binding_id: &str,
112    dedupe_enabled: bool,
113    dedupe_ttl: StdDuration,
114    mut event: TriggerEvent,
115) -> Result<PostNormalizeOutcome, ConnectorError> {
116    if dedupe_enabled && !event.dedupe_claimed() {
117        if !inbox
118            .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
119            .await?
120        {
121            return Ok(PostNormalizeOutcome::DuplicateDropped);
122        }
123        event.mark_dedupe_claimed();
124    }
125
126    Ok(PostNormalizeOutcome::Ready(Box::new(event)))
127}
128
129/// Outbound provider client interface used by connector-backed stdlib modules.
130#[async_trait]
131pub trait ConnectorClient: Send + Sync {
132    async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
133}
134
135/// Minimal outbound client errors shared by connector implementations.
136#[derive(Clone, Debug, PartialEq, Eq)]
137pub enum ClientError {
138    MethodNotFound(String),
139    InvalidArgs(String),
140    RateLimited(String),
141    Transport(String),
142    Other(String),
143}
144
145impl fmt::Display for ClientError {
146    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147        match self {
148            Self::MethodNotFound(message)
149            | Self::InvalidArgs(message)
150            | Self::RateLimited(message)
151            | Self::Transport(message)
152            | Self::Other(message) => message.fmt(f),
153        }
154    }
155}
156
157impl std::error::Error for ClientError {}
158
159/// Shared connector-layer errors.
160#[derive(Debug)]
161pub enum ConnectorError {
162    DuplicateProvider(String),
163    DuplicateDelivery(String),
164    UnknownProvider(String),
165    MissingHeader(String),
166    InvalidHeader {
167        name: String,
168        detail: String,
169    },
170    InvalidSignature(String),
171    TimestampOutOfWindow {
172        timestamp: OffsetDateTime,
173        now: OffsetDateTime,
174        window: time::Duration,
175    },
176    Json(String),
177    Secret(String),
178    EventLog(String),
179    HarnRuntime(String),
180    Client(ClientError),
181    Unsupported(String),
182    Activation(String),
183}
184
185impl ConnectorError {
186    pub fn invalid_signature(message: impl Into<String>) -> Self {
187        Self::InvalidSignature(message.into())
188    }
189}
190
191impl fmt::Display for ConnectorError {
192    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
193        match self {
194            Self::DuplicateProvider(provider) => {
195                write!(f, "connector provider `{provider}` is already registered")
196            }
197            Self::DuplicateDelivery(message) => message.fmt(f),
198            Self::UnknownProvider(provider) => {
199                write!(f, "connector provider `{provider}` is not registered")
200            }
201            Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
202            Self::InvalidHeader { name, detail } => {
203                write!(f, "invalid header `{name}`: {detail}")
204            }
205            Self::InvalidSignature(message)
206            | Self::Json(message)
207            | Self::Secret(message)
208            | Self::EventLog(message)
209            | Self::HarnRuntime(message)
210            | Self::Unsupported(message)
211            | Self::Activation(message) => message.fmt(f),
212            Self::TimestampOutOfWindow {
213                timestamp,
214                now,
215                window,
216            } => write!(
217                f,
218                "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
219            ),
220            Self::Client(error) => error.fmt(f),
221        }
222    }
223}
224
225impl std::error::Error for ConnectorError {}
226
227impl From<crate::event_log::LogError> for ConnectorError {
228    fn from(value: crate::event_log::LogError) -> Self {
229        Self::EventLog(value.to_string())
230    }
231}
232
233impl From<crate::secrets::SecretError> for ConnectorError {
234    fn from(value: crate::secrets::SecretError) -> Self {
235        Self::Secret(value.to_string())
236    }
237}
238
239impl From<serde_json::Error> for ConnectorError {
240    fn from(value: serde_json::Error) -> Self {
241        Self::Json(value.to_string())
242    }
243}
244
245impl From<ClientError> for ConnectorError {
246    fn from(value: ClientError) -> Self {
247        Self::Client(value)
248    }
249}
250
251/// Startup context shared with connector instances.
252#[derive(Clone)]
253pub struct ConnectorCtx {
254    pub event_log: Arc<AnyEventLog>,
255    pub secrets: Arc<dyn SecretProvider>,
256    pub inbox: Arc<InboxIndex>,
257    pub metrics: Arc<MetricsRegistry>,
258    pub rate_limiter: Arc<RateLimiterFactory>,
259}
260
261/// Snapshot of connector-local metrics surfaced for tests and diagnostics.
262#[derive(Clone, Debug, Default, PartialEq, Eq)]
263pub struct ConnectorMetricsSnapshot {
264    pub inbox_claims_written: u64,
265    pub inbox_duplicates_rejected: u64,
266    pub inbox_fast_path_hits: u64,
267    pub inbox_durable_hits: u64,
268    pub inbox_expired_entries: u64,
269    pub inbox_active_entries: u64,
270    pub linear_timestamp_rejections_total: u64,
271    pub dispatch_succeeded_total: u64,
272    pub dispatch_failed_total: u64,
273    pub retry_scheduled_total: u64,
274    pub slack_delivery_success_total: u64,
275    pub slack_delivery_failure_total: u64,
276}
277
278/// Shared metrics surface for connector-local counters and timings.
279#[derive(Debug, Default)]
280pub struct MetricsRegistry {
281    inbox_claims_written: AtomicU64,
282    inbox_duplicates_rejected: AtomicU64,
283    inbox_fast_path_hits: AtomicU64,
284    inbox_durable_hits: AtomicU64,
285    inbox_expired_entries: AtomicU64,
286    inbox_active_entries: AtomicU64,
287    linear_timestamp_rejections_total: AtomicU64,
288    dispatch_succeeded_total: AtomicU64,
289    dispatch_failed_total: AtomicU64,
290    retry_scheduled_total: AtomicU64,
291    slack_delivery_success_total: AtomicU64,
292    slack_delivery_failure_total: AtomicU64,
293    custom_counters: Mutex<BTreeMap<String, u64>>,
294}
295
296impl MetricsRegistry {
297    pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
298        ConnectorMetricsSnapshot {
299            inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
300            inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
301            inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
302            inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
303            inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
304            inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
305            linear_timestamp_rejections_total: self
306                .linear_timestamp_rejections_total
307                .load(Ordering::Relaxed),
308            dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
309            dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
310            retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
311            slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
312            slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
313        }
314    }
315
316    pub(crate) fn record_inbox_claim(&self) {
317        self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
318    }
319
320    pub(crate) fn record_inbox_duplicate_fast_path(&self) {
321        self.inbox_duplicates_rejected
322            .fetch_add(1, Ordering::Relaxed);
323        self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
324    }
325
326    pub(crate) fn record_inbox_duplicate_durable(&self) {
327        self.inbox_duplicates_rejected
328            .fetch_add(1, Ordering::Relaxed);
329        self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
330    }
331
332    pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
333        if count > 0 {
334            self.inbox_expired_entries
335                .fetch_add(count, Ordering::Relaxed);
336        }
337    }
338
339    pub(crate) fn set_inbox_active_entries(&self, count: usize) {
340        self.inbox_active_entries
341            .store(count as u64, Ordering::Relaxed);
342    }
343
344    pub fn record_linear_timestamp_rejection(&self) {
345        self.linear_timestamp_rejections_total
346            .fetch_add(1, Ordering::Relaxed);
347    }
348
349    pub fn record_dispatch_succeeded(&self) {
350        self.dispatch_succeeded_total
351            .fetch_add(1, Ordering::Relaxed);
352    }
353
354    pub fn record_dispatch_failed(&self) {
355        self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
356    }
357
358    pub fn record_retry_scheduled(&self) {
359        self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
360    }
361
362    pub fn record_slack_delivery_success(&self) {
363        self.slack_delivery_success_total
364            .fetch_add(1, Ordering::Relaxed);
365    }
366
367    pub fn record_slack_delivery_failure(&self) {
368        self.slack_delivery_failure_total
369            .fetch_add(1, Ordering::Relaxed);
370    }
371
372    pub fn record_custom_counter(&self, name: &str, amount: u64) {
373        if amount == 0 {
374            return;
375        }
376        let mut counters = self
377            .custom_counters
378            .lock()
379            .expect("custom counters poisoned");
380        *counters.entry(name.to_string()).or_default() += amount;
381    }
382
383    pub fn render_prometheus(&self) -> String {
384        let snapshot = self.snapshot();
385        let counters = [
386            (
387                "connector_linear_timestamp_rejections_total",
388                snapshot.linear_timestamp_rejections_total,
389            ),
390            (
391                "dispatch_succeeded_total",
392                snapshot.dispatch_succeeded_total,
393            ),
394            ("dispatch_failed_total", snapshot.dispatch_failed_total),
395            ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
396            ("retry_scheduled_total", snapshot.retry_scheduled_total),
397            (
398                "slack_events_delivery_success_total",
399                snapshot.slack_delivery_success_total,
400            ),
401            (
402                "slack_events_delivery_failure_total",
403                snapshot.slack_delivery_failure_total,
404            ),
405        ];
406
407        let mut rendered = String::new();
408        for (name, value) in counters {
409            rendered.push_str("# TYPE ");
410            rendered.push_str(name);
411            rendered.push_str(" counter\n");
412            rendered.push_str(name);
413            rendered.push(' ');
414            rendered.push_str(&value.to_string());
415            rendered.push('\n');
416        }
417        let custom_counters = self
418            .custom_counters
419            .lock()
420            .expect("custom counters poisoned");
421        for (name, value) in custom_counters.iter() {
422            let metric_name = format!(
423                "connector_custom_{}_total",
424                name.chars()
425                    .map(|ch| if ch.is_ascii_alphanumeric() || ch == '_' {
426                        ch
427                    } else {
428                        '_'
429                    })
430                    .collect::<String>()
431            );
432            rendered.push_str("# TYPE ");
433            rendered.push_str(&metric_name);
434            rendered.push_str(" counter\n");
435            rendered.push_str(&metric_name);
436            rendered.push(' ');
437            rendered.push_str(&value.to_string());
438            rendered.push('\n');
439        }
440        rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
441        rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
442        rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
443        rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
444        rendered
445    }
446}
447
448/// Provider payload schema metadata exposed by a connector.
449#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
450pub struct ProviderPayloadSchema {
451    pub harn_schema_name: String,
452    #[serde(default)]
453    pub json_schema: JsonValue,
454}
455
456impl ProviderPayloadSchema {
457    pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
458        Self {
459            harn_schema_name: harn_schema_name.into(),
460            json_schema,
461        }
462    }
463
464    pub fn named(harn_schema_name: impl Into<String>) -> Self {
465        Self::new(harn_schema_name, JsonValue::Null)
466    }
467}
468
469impl Default for ProviderPayloadSchema {
470    fn default() -> Self {
471        Self::named("raw")
472    }
473}
474
475/// High-level transport kind a connector supports.
476#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
477#[serde(transparent)]
478pub struct TriggerKind(String);
479
480impl TriggerKind {
481    pub fn new(value: impl Into<String>) -> Self {
482        Self(value.into())
483    }
484
485    pub fn as_str(&self) -> &str {
486        self.0.as_str()
487    }
488}
489
490impl From<&str> for TriggerKind {
491    fn from(value: &str) -> Self {
492        Self::new(value)
493    }
494}
495
496impl From<String> for TriggerKind {
497    fn from(value: String) -> Self {
498        Self::new(value)
499    }
500}
501
502/// Future trigger manifest binding routed to a connector activation.
503#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
504pub struct TriggerBinding {
505    pub provider: ProviderId,
506    pub kind: TriggerKind,
507    pub binding_id: String,
508    #[serde(default)]
509    pub dedupe_key: Option<String>,
510    #[serde(default = "default_dedupe_retention_days")]
511    pub dedupe_retention_days: u32,
512    #[serde(default)]
513    pub config: JsonValue,
514}
515
516impl TriggerBinding {
517    pub fn new(
518        provider: ProviderId,
519        kind: impl Into<TriggerKind>,
520        binding_id: impl Into<String>,
521    ) -> Self {
522        Self {
523            provider,
524            kind: kind.into(),
525            binding_id: binding_id.into(),
526            dedupe_key: None,
527            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
528            config: JsonValue::Null,
529        }
530    }
531}
532
533fn default_dedupe_retention_days() -> u32 {
534    crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
535}
536
537/// Small in-memory trigger-binding registry used to fan bindings into connectors.
538#[derive(Clone, Debug, Default)]
539pub struct TriggerRegistry {
540    bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
541}
542
543impl TriggerRegistry {
544    pub fn register(&mut self, binding: TriggerBinding) {
545        self.bindings
546            .entry(binding.provider.clone())
547            .or_default()
548            .push(binding);
549    }
550
551    pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
552        &self.bindings
553    }
554
555    pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
556        self.bindings
557            .get(provider)
558            .map(Vec::as_slice)
559            .unwrap_or(&[])
560    }
561}
562
563/// Metadata returned from connector activation.
564#[derive(Clone, Debug, PartialEq, Eq)]
565pub struct ActivationHandle {
566    pub provider: ProviderId,
567    pub binding_count: usize,
568}
569
570impl ActivationHandle {
571    pub fn new(provider: ProviderId, binding_count: usize) -> Self {
572        Self {
573            provider,
574            binding_count,
575        }
576    }
577}
578
579/// Provider-native inbound request payload preserved as raw bytes.
580#[derive(Clone, Debug, PartialEq)]
581pub struct RawInbound {
582    pub kind: String,
583    pub headers: BTreeMap<String, String>,
584    pub query: BTreeMap<String, String>,
585    pub body: Vec<u8>,
586    pub received_at: OffsetDateTime,
587    pub occurred_at: Option<OffsetDateTime>,
588    pub tenant_id: Option<TenantId>,
589    pub metadata: JsonValue,
590}
591
592impl RawInbound {
593    pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
594        Self {
595            kind: kind.into(),
596            headers,
597            query: BTreeMap::new(),
598            body,
599            received_at: clock::now_utc(),
600            occurred_at: None,
601            tenant_id: None,
602            metadata: JsonValue::Null,
603        }
604    }
605
606    pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
607        Ok(serde_json::from_slice(&self.body)?)
608    }
609}
610
611/// Token-bucket configuration shared across connector clients.
612#[derive(Clone, Copy, Debug, PartialEq, Eq)]
613pub struct RateLimitConfig {
614    pub capacity: u32,
615    pub refill_tokens: u32,
616    pub refill_interval: StdDuration,
617}
618
619impl Default for RateLimitConfig {
620    fn default() -> Self {
621        Self {
622            capacity: 60,
623            refill_tokens: 1,
624            refill_interval: StdDuration::from_secs(1),
625        }
626    }
627}
628
629#[derive(Clone, Debug)]
630struct TokenBucket {
631    tokens: f64,
632    last_refill: ClockInstant,
633}
634
635impl TokenBucket {
636    fn full(config: RateLimitConfig) -> Self {
637        Self {
638            tokens: config.capacity as f64,
639            last_refill: clock::instant_now(),
640        }
641    }
642
643    fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
644        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
645        let rate = config.refill_tokens.max(1) as f64 / interval;
646        let elapsed = now.duration_since(self.last_refill).as_secs_f64();
647        self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
648        self.last_refill = now;
649    }
650
651    fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
652        self.refill(config, now);
653        if self.tokens >= 1.0 {
654            self.tokens -= 1.0;
655            true
656        } else {
657            false
658        }
659    }
660
661    fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
662        if self.tokens >= 1.0 {
663            return StdDuration::ZERO;
664        }
665        let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
666        let rate = config.refill_tokens.max(1) as f64 / interval;
667        let missing = (1.0 - self.tokens).max(0.0);
668        StdDuration::from_secs_f64((missing / rate).max(0.001))
669    }
670}
671
672/// Shared per-provider, per-key token bucket factory for outbound connector clients.
673#[derive(Debug)]
674pub struct RateLimiterFactory {
675    config: RateLimitConfig,
676    buckets: Mutex<HashMap<(String, String), TokenBucket>>,
677}
678
679impl RateLimiterFactory {
680    pub fn new(config: RateLimitConfig) -> Self {
681        Self {
682            config,
683            buckets: Mutex::new(HashMap::new()),
684        }
685    }
686
687    pub fn config(&self) -> RateLimitConfig {
688        self.config
689    }
690
691    pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
692        ScopedRateLimiter {
693            factory: self,
694            provider: provider.clone(),
695            key: key.into(),
696        }
697    }
698
699    pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
700        let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
701        let bucket = buckets
702            .entry((provider.as_str().to_string(), key.to_string()))
703            .or_insert_with(|| TokenBucket::full(self.config));
704        bucket.try_acquire(self.config, clock::instant_now())
705    }
706
707    pub async fn acquire(&self, provider: &ProviderId, key: &str) {
708        loop {
709            let wait = {
710                let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
711                let bucket = buckets
712                    .entry((provider.as_str().to_string(), key.to_string()))
713                    .or_insert_with(|| TokenBucket::full(self.config));
714                if bucket.try_acquire(self.config, clock::instant_now()) {
715                    return;
716                }
717                bucket.wait_duration(self.config)
718            };
719            tokio::time::sleep(wait).await;
720        }
721    }
722}
723
724impl Default for RateLimiterFactory {
725    fn default() -> Self {
726        Self::new(RateLimitConfig::default())
727    }
728}
729
730/// Borrowed view onto a single provider/key rate-limit scope.
731#[derive(Clone, Debug)]
732pub struct ScopedRateLimiter<'a> {
733    factory: &'a RateLimiterFactory,
734    provider: ProviderId,
735    key: String,
736}
737
738impl<'a> ScopedRateLimiter<'a> {
739    pub fn try_acquire(&self) -> bool {
740        self.factory.try_acquire(&self.provider, &self.key)
741    }
742
743    pub async fn acquire(&self) {
744        self.factory.acquire(&self.provider, &self.key).await;
745    }
746}
747
748/// Runtime connector registry keyed by provider id.
749pub struct ConnectorRegistry {
750    connectors: BTreeMap<ProviderId, ConnectorHandle>,
751}
752
753impl ConnectorRegistry {
754    pub fn empty() -> Self {
755        Self {
756            connectors: BTreeMap::new(),
757        }
758    }
759
760    pub fn with_defaults() -> Self {
761        let mut registry = Self::empty();
762        for provider in registered_provider_metadata() {
763            registry
764                .register(default_connector_for_provider(&provider))
765                .expect("default connector registration should not fail");
766        }
767        registry
768    }
769
770    pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
771        let provider = connector.provider_id().clone();
772        if self.connectors.contains_key(&provider) {
773            return Err(ConnectorError::DuplicateProvider(provider.0));
774        }
775        self.connectors
776            .insert(provider, Arc::new(AsyncMutex::new(connector)));
777        Ok(())
778    }
779
780    pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
781        self.connectors.get(id).cloned()
782    }
783
784    pub fn remove(&mut self, id: &ProviderId) -> Option<ConnectorHandle> {
785        self.connectors.remove(id)
786    }
787
788    pub fn list(&self) -> Vec<ProviderId> {
789        self.connectors.keys().cloned().collect()
790    }
791
792    pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
793        for connector in self.connectors.values() {
794            connector.lock().await.init(ctx.clone()).await?;
795        }
796        Ok(())
797    }
798
799    pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
800        let mut clients = BTreeMap::new();
801        for (provider, connector) in &self.connectors {
802            let client = connector.lock().await.client();
803            clients.insert(provider.clone(), client);
804        }
805        clients
806    }
807
808    pub async fn activate_all(
809        &self,
810        registry: &TriggerRegistry,
811    ) -> Result<Vec<ActivationHandle>, ConnectorError> {
812        let mut handles = Vec::new();
813        for (provider, connector) in &self.connectors {
814            let bindings = registry.bindings_for(provider);
815            if bindings.is_empty() {
816                continue;
817            }
818            let connector = connector.lock().await;
819            handles.push(connector.activate(bindings).await?);
820        }
821        Ok(handles)
822    }
823}
824
825impl Default for ConnectorRegistry {
826    fn default() -> Self {
827        Self::with_defaults()
828    }
829}
830
831fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
832    // The provider catalog on main registers `github` with
833    // ProviderRuntimeMetadata::Builtin { connector: "webhook", ... } so that
834    // before a native connector existed the catalog auto-wired a
835    // GenericWebhookConnector. Now that #170 lands a first-class
836    // GitHubConnector (inbound HMAC + GitHub App outbound), we short-circuit
837    // provider_id "github" here and return the native connector instead of a
838    // webhook-backed fallback. This keeps manifests that say
839    // `provider = "github"` pointed at the new connector without requiring
840    // users to switch to a distinct provider_id.
841    if provider.provider == "github" {
842        return Box::new(GitHubConnector::new());
843    }
844    if provider.provider == "linear" {
845        return Box::new(LinearConnector::new());
846    }
847    if provider.provider == "slack" {
848        return Box::new(SlackConnector::new());
849    }
850    if provider.provider == "notion" {
851        return Box::new(NotionConnector::new());
852    }
853    match &provider.runtime {
854        ProviderRuntimeMetadata::Builtin {
855            connector,
856            default_signature_variant,
857        } => match connector.as_str() {
858            "cron" => Box::new(CronConnector::new()),
859            "webhook" => {
860                let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
861                    .expect("catalog webhook signature variant must be valid");
862                Box::new(GenericWebhookConnector::with_profile(
863                    WebhookProviderProfile::new(
864                        ProviderId::from(provider.provider.clone()),
865                        provider.schema_name.clone(),
866                        variant,
867                    ),
868                ))
869            }
870            _ => Box::new(PlaceholderConnector::from_metadata(provider)),
871        },
872        ProviderRuntimeMetadata::Placeholder => {
873            Box::new(PlaceholderConnector::from_metadata(provider))
874        }
875    }
876}
877
878struct PlaceholderConnector {
879    provider_id: ProviderId,
880    kinds: Vec<TriggerKind>,
881    schema_name: String,
882}
883
884impl PlaceholderConnector {
885    fn from_metadata(metadata: &ProviderMetadata) -> Self {
886        Self {
887            provider_id: ProviderId::from(metadata.provider.clone()),
888            kinds: metadata
889                .kinds
890                .iter()
891                .cloned()
892                .map(TriggerKind::from)
893                .collect(),
894            schema_name: metadata.schema_name.clone(),
895        }
896    }
897}
898
899struct PlaceholderClient;
900
901#[async_trait]
902impl ConnectorClient for PlaceholderClient {
903    async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
904        Err(ClientError::Other(format!(
905            "connector client method '{method}' is not implemented for this provider"
906        )))
907    }
908}
909
910#[async_trait]
911impl Connector for PlaceholderConnector {
912    fn provider_id(&self) -> &ProviderId {
913        &self.provider_id
914    }
915
916    fn kinds(&self) -> &[TriggerKind] {
917        &self.kinds
918    }
919
920    async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
921        Ok(())
922    }
923
924    async fn activate(
925        &self,
926        bindings: &[TriggerBinding],
927    ) -> Result<ActivationHandle, ConnectorError> {
928        Ok(ActivationHandle::new(
929            self.provider_id.clone(),
930            bindings.len(),
931        ))
932    }
933
934    async fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
935        Err(ConnectorError::Unsupported(format!(
936            "provider '{}' is cataloged but does not have a concrete inbound connector yet",
937            self.provider_id.as_str()
938        )))
939    }
940
941    fn payload_schema(&self) -> ProviderPayloadSchema {
942        ProviderPayloadSchema::named(self.schema_name.clone())
943    }
944
945    fn client(&self) -> Arc<dyn ConnectorClient> {
946        Arc::new(PlaceholderClient)
947    }
948}
949
950pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
951    ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
952        *slot.borrow_mut() = clients
953            .into_iter()
954            .map(|(provider, client)| (provider.as_str().to_string(), client))
955            .collect();
956    });
957}
958
959pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
960    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
961}
962
963pub fn clear_active_connector_clients() {
964    ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
965}
966
967#[cfg(test)]
968mod tests {
969    use super::*;
970
971    use std::sync::atomic::{AtomicUsize, Ordering};
972
973    use async_trait::async_trait;
974    use serde_json::json;
975
976    struct NoopClient;
977
978    #[async_trait]
979    impl ConnectorClient for NoopClient {
980        async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
981            Ok(json!({ "method": method }))
982        }
983    }
984
985    struct FakeConnector {
986        provider_id: ProviderId,
987        kinds: Vec<TriggerKind>,
988        activate_calls: Arc<AtomicUsize>,
989    }
990
991    impl FakeConnector {
992        fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
993            Self {
994                provider_id: ProviderId::from(provider_id),
995                kinds: vec![TriggerKind::from("webhook")],
996                activate_calls,
997            }
998        }
999    }
1000
1001    #[async_trait]
1002    impl Connector for FakeConnector {
1003        fn provider_id(&self) -> &ProviderId {
1004            &self.provider_id
1005        }
1006
1007        fn kinds(&self) -> &[TriggerKind] {
1008            &self.kinds
1009        }
1010
1011        async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1012            Ok(())
1013        }
1014
1015        async fn activate(
1016            &self,
1017            bindings: &[TriggerBinding],
1018        ) -> Result<ActivationHandle, ConnectorError> {
1019            self.activate_calls.fetch_add(1, Ordering::SeqCst);
1020            Ok(ActivationHandle::new(
1021                self.provider_id.clone(),
1022                bindings.len(),
1023            ))
1024        }
1025
1026        async fn normalize_inbound(
1027            &self,
1028            _raw: RawInbound,
1029        ) -> Result<TriggerEvent, ConnectorError> {
1030            Err(ConnectorError::Unsupported(
1031                "not needed for registry tests".to_string(),
1032            ))
1033        }
1034
1035        fn payload_schema(&self) -> ProviderPayloadSchema {
1036            ProviderPayloadSchema::named("FakePayload")
1037        }
1038
1039        fn client(&self) -> Arc<dyn ConnectorClient> {
1040            Arc::new(NoopClient)
1041        }
1042    }
1043
1044    #[tokio::test]
1045    async fn connector_registry_rejects_duplicate_providers() {
1046        let activate_calls = Arc::new(AtomicUsize::new(0));
1047        let mut registry = ConnectorRegistry::empty();
1048        registry
1049            .register(Box::new(FakeConnector::new(
1050                "github",
1051                activate_calls.clone(),
1052            )))
1053            .unwrap();
1054
1055        let error = registry
1056            .register(Box::new(FakeConnector::new("github", activate_calls)))
1057            .unwrap_err();
1058        assert!(matches!(
1059            error,
1060            ConnectorError::DuplicateProvider(provider) if provider == "github"
1061        ));
1062    }
1063
1064    #[tokio::test]
1065    async fn connector_registry_activates_only_bound_connectors() {
1066        let github_calls = Arc::new(AtomicUsize::new(0));
1067        let slack_calls = Arc::new(AtomicUsize::new(0));
1068        let mut registry = ConnectorRegistry::empty();
1069        registry
1070            .register(Box::new(FakeConnector::new("github", github_calls.clone())))
1071            .unwrap();
1072        registry
1073            .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
1074            .unwrap();
1075
1076        let mut trigger_registry = TriggerRegistry::default();
1077        trigger_registry.register(TriggerBinding::new(
1078            ProviderId::from("github"),
1079            "webhook",
1080            "github.push",
1081        ));
1082        trigger_registry.register(TriggerBinding::new(
1083            ProviderId::from("github"),
1084            "webhook",
1085            "github.installation",
1086        ));
1087
1088        let handles = registry.activate_all(&trigger_registry).await.unwrap();
1089        assert_eq!(handles.len(), 1);
1090        assert_eq!(handles[0].provider.as_str(), "github");
1091        assert_eq!(handles[0].binding_count, 2);
1092        assert_eq!(github_calls.load(Ordering::SeqCst), 1);
1093        assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
1094    }
1095
1096    #[test]
1097    fn rate_limiter_scopes_tokens_by_provider_and_key() {
1098        let factory = RateLimiterFactory::new(RateLimitConfig {
1099            capacity: 1,
1100            refill_tokens: 1,
1101            refill_interval: StdDuration::from_secs(60),
1102        });
1103
1104        assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
1105        assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
1106        assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
1107        assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
1108    }
1109
1110    #[test]
1111    fn raw_inbound_json_body_preserves_raw_bytes() {
1112        let raw = RawInbound::new(
1113            "push",
1114            BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
1115            br#"{"ok":true}"#.to_vec(),
1116        );
1117
1118        assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
1119    }
1120
1121    #[test]
1122    fn connector_registry_lists_catalog_providers() {
1123        let registry = ConnectorRegistry::default();
1124        let providers = registry.list();
1125        assert!(providers.contains(&ProviderId::from("cron")));
1126        assert!(providers.contains(&ProviderId::from("github")));
1127        assert!(providers.contains(&ProviderId::from("webhook")));
1128    }
1129}