1use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25 registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26 ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod cron;
30pub mod github;
31pub mod hmac;
32pub mod linear;
33pub mod notion;
34pub mod slack;
35#[cfg(test)]
36pub(crate) mod test_util;
37pub mod webhook;
38
39pub use cron::{CatchupMode, CronConnector};
40pub use github::GitHubConnector;
41pub use hmac::{
42 verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
43 DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
44 DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
45 DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
46 DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
47 DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
48 SIGNATURE_VERIFY_AUDIT_TOPIC,
49};
50pub use linear::LinearConnector;
51pub use notion::{
52 load_pending_webhook_handshakes, NotionConnector, PersistedNotionWebhookHandshake,
53};
54pub use slack::SlackConnector;
55use webhook::WebhookProviderProfile;
56pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
57
58pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
60
61thread_local! {
62 static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
63 RefCell::new(BTreeMap::new());
64}
65
66#[async_trait]
68pub trait Connector: Send + Sync {
69 fn provider_id(&self) -> &ProviderId;
71
72 fn kinds(&self) -> &[TriggerKind];
74
75 async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
77
78 async fn activate(
80 &self,
81 bindings: &[TriggerBinding],
82 ) -> Result<ActivationHandle, ConnectorError>;
83
84 async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
86 Ok(())
87 }
88
89 fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
91
92 fn payload_schema(&self) -> ProviderPayloadSchema;
94
95 fn client(&self) -> Arc<dyn ConnectorClient>;
97}
98
99#[derive(Clone, Debug, PartialEq)]
100pub enum PostNormalizeOutcome {
101 Ready(Box<TriggerEvent>),
102 DuplicateDropped,
103}
104
105pub async fn postprocess_normalized_event(
106 inbox: &InboxIndex,
107 binding_id: &str,
108 dedupe_enabled: bool,
109 dedupe_ttl: StdDuration,
110 mut event: TriggerEvent,
111) -> Result<PostNormalizeOutcome, ConnectorError> {
112 if dedupe_enabled && !event.dedupe_claimed() {
113 if !inbox
114 .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
115 .await?
116 {
117 return Ok(PostNormalizeOutcome::DuplicateDropped);
118 }
119 event.mark_dedupe_claimed();
120 }
121
122 Ok(PostNormalizeOutcome::Ready(Box::new(event)))
123}
124
125#[async_trait]
127pub trait ConnectorClient: Send + Sync {
128 async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
129}
130
131#[derive(Clone, Debug, PartialEq, Eq)]
133pub enum ClientError {
134 MethodNotFound(String),
135 InvalidArgs(String),
136 RateLimited(String),
137 Transport(String),
138 Other(String),
139}
140
141impl fmt::Display for ClientError {
142 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143 match self {
144 Self::MethodNotFound(message)
145 | Self::InvalidArgs(message)
146 | Self::RateLimited(message)
147 | Self::Transport(message)
148 | Self::Other(message) => message.fmt(f),
149 }
150 }
151}
152
153impl std::error::Error for ClientError {}
154
155#[derive(Debug)]
157pub enum ConnectorError {
158 DuplicateProvider(String),
159 DuplicateDelivery(String),
160 UnknownProvider(String),
161 MissingHeader(String),
162 InvalidHeader {
163 name: String,
164 detail: String,
165 },
166 InvalidSignature(String),
167 TimestampOutOfWindow {
168 timestamp: OffsetDateTime,
169 now: OffsetDateTime,
170 window: time::Duration,
171 },
172 Json(String),
173 Secret(String),
174 EventLog(String),
175 Client(ClientError),
176 Unsupported(String),
177 Activation(String),
178}
179
180impl ConnectorError {
181 pub fn invalid_signature(message: impl Into<String>) -> Self {
182 Self::InvalidSignature(message.into())
183 }
184}
185
186impl fmt::Display for ConnectorError {
187 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188 match self {
189 Self::DuplicateProvider(provider) => {
190 write!(f, "connector provider `{provider}` is already registered")
191 }
192 Self::DuplicateDelivery(message) => message.fmt(f),
193 Self::UnknownProvider(provider) => {
194 write!(f, "connector provider `{provider}` is not registered")
195 }
196 Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
197 Self::InvalidHeader { name, detail } => {
198 write!(f, "invalid header `{name}`: {detail}")
199 }
200 Self::InvalidSignature(message)
201 | Self::Json(message)
202 | Self::Secret(message)
203 | Self::EventLog(message)
204 | Self::Unsupported(message)
205 | Self::Activation(message) => message.fmt(f),
206 Self::TimestampOutOfWindow {
207 timestamp,
208 now,
209 window,
210 } => write!(
211 f,
212 "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
213 ),
214 Self::Client(error) => error.fmt(f),
215 }
216 }
217}
218
219impl std::error::Error for ConnectorError {}
220
221impl From<crate::event_log::LogError> for ConnectorError {
222 fn from(value: crate::event_log::LogError) -> Self {
223 Self::EventLog(value.to_string())
224 }
225}
226
227impl From<crate::secrets::SecretError> for ConnectorError {
228 fn from(value: crate::secrets::SecretError) -> Self {
229 Self::Secret(value.to_string())
230 }
231}
232
233impl From<serde_json::Error> for ConnectorError {
234 fn from(value: serde_json::Error) -> Self {
235 Self::Json(value.to_string())
236 }
237}
238
239impl From<ClientError> for ConnectorError {
240 fn from(value: ClientError) -> Self {
241 Self::Client(value)
242 }
243}
244
245#[derive(Clone)]
247pub struct ConnectorCtx {
248 pub event_log: Arc<AnyEventLog>,
249 pub secrets: Arc<dyn SecretProvider>,
250 pub inbox: Arc<InboxIndex>,
251 pub metrics: Arc<MetricsRegistry>,
252 pub rate_limiter: Arc<RateLimiterFactory>,
253}
254
255#[derive(Clone, Debug, Default, PartialEq, Eq)]
257pub struct ConnectorMetricsSnapshot {
258 pub inbox_claims_written: u64,
259 pub inbox_duplicates_rejected: u64,
260 pub inbox_fast_path_hits: u64,
261 pub inbox_durable_hits: u64,
262 pub inbox_expired_entries: u64,
263 pub inbox_active_entries: u64,
264 pub linear_timestamp_rejections_total: u64,
265 pub dispatch_succeeded_total: u64,
266 pub dispatch_failed_total: u64,
267 pub retry_scheduled_total: u64,
268 pub slack_delivery_success_total: u64,
269 pub slack_delivery_failure_total: u64,
270}
271
272#[derive(Debug, Default)]
274pub struct MetricsRegistry {
275 inbox_claims_written: AtomicU64,
276 inbox_duplicates_rejected: AtomicU64,
277 inbox_fast_path_hits: AtomicU64,
278 inbox_durable_hits: AtomicU64,
279 inbox_expired_entries: AtomicU64,
280 inbox_active_entries: AtomicU64,
281 linear_timestamp_rejections_total: AtomicU64,
282 dispatch_succeeded_total: AtomicU64,
283 dispatch_failed_total: AtomicU64,
284 retry_scheduled_total: AtomicU64,
285 slack_delivery_success_total: AtomicU64,
286 slack_delivery_failure_total: AtomicU64,
287}
288
289impl MetricsRegistry {
290 pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
291 ConnectorMetricsSnapshot {
292 inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
293 inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
294 inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
295 inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
296 inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
297 inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
298 linear_timestamp_rejections_total: self
299 .linear_timestamp_rejections_total
300 .load(Ordering::Relaxed),
301 dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
302 dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
303 retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
304 slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
305 slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
306 }
307 }
308
309 pub(crate) fn record_inbox_claim(&self) {
310 self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
311 }
312
313 pub(crate) fn record_inbox_duplicate_fast_path(&self) {
314 self.inbox_duplicates_rejected
315 .fetch_add(1, Ordering::Relaxed);
316 self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
317 }
318
319 pub(crate) fn record_inbox_duplicate_durable(&self) {
320 self.inbox_duplicates_rejected
321 .fetch_add(1, Ordering::Relaxed);
322 self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
323 }
324
325 pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
326 if count > 0 {
327 self.inbox_expired_entries
328 .fetch_add(count, Ordering::Relaxed);
329 }
330 }
331
332 pub(crate) fn set_inbox_active_entries(&self, count: usize) {
333 self.inbox_active_entries
334 .store(count as u64, Ordering::Relaxed);
335 }
336
337 pub fn record_linear_timestamp_rejection(&self) {
338 self.linear_timestamp_rejections_total
339 .fetch_add(1, Ordering::Relaxed);
340 }
341
342 pub fn record_dispatch_succeeded(&self) {
343 self.dispatch_succeeded_total
344 .fetch_add(1, Ordering::Relaxed);
345 }
346
347 pub fn record_dispatch_failed(&self) {
348 self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
349 }
350
351 pub fn record_retry_scheduled(&self) {
352 self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
353 }
354
355 pub fn record_slack_delivery_success(&self) {
356 self.slack_delivery_success_total
357 .fetch_add(1, Ordering::Relaxed);
358 }
359
360 pub fn record_slack_delivery_failure(&self) {
361 self.slack_delivery_failure_total
362 .fetch_add(1, Ordering::Relaxed);
363 }
364
365 pub fn render_prometheus(&self) -> String {
366 let snapshot = self.snapshot();
367 let counters = [
368 (
369 "connector_linear_timestamp_rejections_total",
370 snapshot.linear_timestamp_rejections_total,
371 ),
372 (
373 "dispatch_succeeded_total",
374 snapshot.dispatch_succeeded_total,
375 ),
376 ("dispatch_failed_total", snapshot.dispatch_failed_total),
377 ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
378 ("retry_scheduled_total", snapshot.retry_scheduled_total),
379 (
380 "slack_events_delivery_success_total",
381 snapshot.slack_delivery_success_total,
382 ),
383 (
384 "slack_events_delivery_failure_total",
385 snapshot.slack_delivery_failure_total,
386 ),
387 ];
388
389 let mut rendered = String::new();
390 for (name, value) in counters {
391 rendered.push_str("# TYPE ");
392 rendered.push_str(name);
393 rendered.push_str(" counter\n");
394 rendered.push_str(name);
395 rendered.push(' ');
396 rendered.push_str(&value.to_string());
397 rendered.push('\n');
398 }
399 rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
400 rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
401 rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
402 rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
403 rendered
404 }
405}
406
407#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
409pub struct ProviderPayloadSchema {
410 pub harn_schema_name: String,
411 #[serde(default)]
412 pub json_schema: JsonValue,
413}
414
415impl ProviderPayloadSchema {
416 pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
417 Self {
418 harn_schema_name: harn_schema_name.into(),
419 json_schema,
420 }
421 }
422
423 pub fn named(harn_schema_name: impl Into<String>) -> Self {
424 Self::new(harn_schema_name, JsonValue::Null)
425 }
426}
427
428impl Default for ProviderPayloadSchema {
429 fn default() -> Self {
430 Self::named("raw")
431 }
432}
433
434#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
436#[serde(transparent)]
437pub struct TriggerKind(String);
438
439impl TriggerKind {
440 pub fn new(value: impl Into<String>) -> Self {
441 Self(value.into())
442 }
443
444 pub fn as_str(&self) -> &str {
445 self.0.as_str()
446 }
447}
448
449impl From<&str> for TriggerKind {
450 fn from(value: &str) -> Self {
451 Self::new(value)
452 }
453}
454
455impl From<String> for TriggerKind {
456 fn from(value: String) -> Self {
457 Self::new(value)
458 }
459}
460
461#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
463pub struct TriggerBinding {
464 pub provider: ProviderId,
465 pub kind: TriggerKind,
466 pub binding_id: String,
467 #[serde(default)]
468 pub dedupe_key: Option<String>,
469 #[serde(default = "default_dedupe_retention_days")]
470 pub dedupe_retention_days: u32,
471 #[serde(default)]
472 pub config: JsonValue,
473}
474
475impl TriggerBinding {
476 pub fn new(
477 provider: ProviderId,
478 kind: impl Into<TriggerKind>,
479 binding_id: impl Into<String>,
480 ) -> Self {
481 Self {
482 provider,
483 kind: kind.into(),
484 binding_id: binding_id.into(),
485 dedupe_key: None,
486 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
487 config: JsonValue::Null,
488 }
489 }
490}
491
492fn default_dedupe_retention_days() -> u32 {
493 crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
494}
495
496#[derive(Clone, Debug, Default)]
498pub struct TriggerRegistry {
499 bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
500}
501
502impl TriggerRegistry {
503 pub fn register(&mut self, binding: TriggerBinding) {
504 self.bindings
505 .entry(binding.provider.clone())
506 .or_default()
507 .push(binding);
508 }
509
510 pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
511 &self.bindings
512 }
513
514 pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
515 self.bindings
516 .get(provider)
517 .map(Vec::as_slice)
518 .unwrap_or(&[])
519 }
520}
521
522#[derive(Clone, Debug, PartialEq, Eq)]
524pub struct ActivationHandle {
525 pub provider: ProviderId,
526 pub binding_count: usize,
527}
528
529impl ActivationHandle {
530 pub fn new(provider: ProviderId, binding_count: usize) -> Self {
531 Self {
532 provider,
533 binding_count,
534 }
535 }
536}
537
538#[derive(Clone, Debug, PartialEq)]
540pub struct RawInbound {
541 pub kind: String,
542 pub headers: BTreeMap<String, String>,
543 pub query: BTreeMap<String, String>,
544 pub body: Vec<u8>,
545 pub received_at: OffsetDateTime,
546 pub occurred_at: Option<OffsetDateTime>,
547 pub tenant_id: Option<TenantId>,
548 pub metadata: JsonValue,
549}
550
551impl RawInbound {
552 pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
553 Self {
554 kind: kind.into(),
555 headers,
556 query: BTreeMap::new(),
557 body,
558 received_at: clock::now_utc(),
559 occurred_at: None,
560 tenant_id: None,
561 metadata: JsonValue::Null,
562 }
563 }
564
565 pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
566 Ok(serde_json::from_slice(&self.body)?)
567 }
568}
569
570#[derive(Clone, Copy, Debug, PartialEq, Eq)]
572pub struct RateLimitConfig {
573 pub capacity: u32,
574 pub refill_tokens: u32,
575 pub refill_interval: StdDuration,
576}
577
578impl Default for RateLimitConfig {
579 fn default() -> Self {
580 Self {
581 capacity: 60,
582 refill_tokens: 1,
583 refill_interval: StdDuration::from_secs(1),
584 }
585 }
586}
587
588#[derive(Clone, Debug)]
589struct TokenBucket {
590 tokens: f64,
591 last_refill: ClockInstant,
592}
593
594impl TokenBucket {
595 fn full(config: RateLimitConfig) -> Self {
596 Self {
597 tokens: config.capacity as f64,
598 last_refill: clock::instant_now(),
599 }
600 }
601
602 fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
603 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
604 let rate = config.refill_tokens.max(1) as f64 / interval;
605 let elapsed = now.duration_since(self.last_refill).as_secs_f64();
606 self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
607 self.last_refill = now;
608 }
609
610 fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
611 self.refill(config, now);
612 if self.tokens >= 1.0 {
613 self.tokens -= 1.0;
614 true
615 } else {
616 false
617 }
618 }
619
620 fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
621 if self.tokens >= 1.0 {
622 return StdDuration::ZERO;
623 }
624 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
625 let rate = config.refill_tokens.max(1) as f64 / interval;
626 let missing = (1.0 - self.tokens).max(0.0);
627 StdDuration::from_secs_f64((missing / rate).max(0.001))
628 }
629}
630
631#[derive(Debug)]
633pub struct RateLimiterFactory {
634 config: RateLimitConfig,
635 buckets: Mutex<HashMap<(String, String), TokenBucket>>,
636}
637
638impl RateLimiterFactory {
639 pub fn new(config: RateLimitConfig) -> Self {
640 Self {
641 config,
642 buckets: Mutex::new(HashMap::new()),
643 }
644 }
645
646 pub fn config(&self) -> RateLimitConfig {
647 self.config
648 }
649
650 pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
651 ScopedRateLimiter {
652 factory: self,
653 provider: provider.clone(),
654 key: key.into(),
655 }
656 }
657
658 pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
659 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
660 let bucket = buckets
661 .entry((provider.as_str().to_string(), key.to_string()))
662 .or_insert_with(|| TokenBucket::full(self.config));
663 bucket.try_acquire(self.config, clock::instant_now())
664 }
665
666 pub async fn acquire(&self, provider: &ProviderId, key: &str) {
667 loop {
668 let wait = {
669 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
670 let bucket = buckets
671 .entry((provider.as_str().to_string(), key.to_string()))
672 .or_insert_with(|| TokenBucket::full(self.config));
673 if bucket.try_acquire(self.config, clock::instant_now()) {
674 return;
675 }
676 bucket.wait_duration(self.config)
677 };
678 tokio::time::sleep(wait).await;
679 }
680 }
681}
682
683impl Default for RateLimiterFactory {
684 fn default() -> Self {
685 Self::new(RateLimitConfig::default())
686 }
687}
688
689#[derive(Clone, Debug)]
691pub struct ScopedRateLimiter<'a> {
692 factory: &'a RateLimiterFactory,
693 provider: ProviderId,
694 key: String,
695}
696
697impl<'a> ScopedRateLimiter<'a> {
698 pub fn try_acquire(&self) -> bool {
699 self.factory.try_acquire(&self.provider, &self.key)
700 }
701
702 pub async fn acquire(&self) {
703 self.factory.acquire(&self.provider, &self.key).await;
704 }
705}
706
707pub struct ConnectorRegistry {
709 connectors: BTreeMap<ProviderId, ConnectorHandle>,
710}
711
712impl ConnectorRegistry {
713 pub fn empty() -> Self {
714 Self {
715 connectors: BTreeMap::new(),
716 }
717 }
718
719 pub fn with_defaults() -> Self {
720 let mut registry = Self::empty();
721 for provider in registered_provider_metadata() {
722 registry
723 .register(default_connector_for_provider(&provider))
724 .expect("default connector registration should not fail");
725 }
726 registry
727 }
728
729 pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
730 let provider = connector.provider_id().clone();
731 if self.connectors.contains_key(&provider) {
732 return Err(ConnectorError::DuplicateProvider(provider.0));
733 }
734 self.connectors
735 .insert(provider, Arc::new(AsyncMutex::new(connector)));
736 Ok(())
737 }
738
739 pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
740 self.connectors.get(id).cloned()
741 }
742
743 pub fn list(&self) -> Vec<ProviderId> {
744 self.connectors.keys().cloned().collect()
745 }
746
747 pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
748 for connector in self.connectors.values() {
749 connector.lock().await.init(ctx.clone()).await?;
750 }
751 Ok(())
752 }
753
754 pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
755 let mut clients = BTreeMap::new();
756 for (provider, connector) in &self.connectors {
757 let client = connector.lock().await.client();
758 clients.insert(provider.clone(), client);
759 }
760 clients
761 }
762
763 pub async fn activate_all(
764 &self,
765 registry: &TriggerRegistry,
766 ) -> Result<Vec<ActivationHandle>, ConnectorError> {
767 let mut handles = Vec::new();
768 for (provider, connector) in &self.connectors {
769 let bindings = registry.bindings_for(provider);
770 if bindings.is_empty() {
771 continue;
772 }
773 let connector = connector.lock().await;
774 handles.push(connector.activate(bindings).await?);
775 }
776 Ok(handles)
777 }
778}
779
780impl Default for ConnectorRegistry {
781 fn default() -> Self {
782 Self::with_defaults()
783 }
784}
785
786fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
787 if provider.provider == "github" {
797 return Box::new(GitHubConnector::new());
798 }
799 if provider.provider == "linear" {
800 return Box::new(LinearConnector::new());
801 }
802 if provider.provider == "slack" {
803 return Box::new(SlackConnector::new());
804 }
805 if provider.provider == "notion" {
806 return Box::new(NotionConnector::new());
807 }
808 match &provider.runtime {
809 ProviderRuntimeMetadata::Builtin {
810 connector,
811 default_signature_variant,
812 } => match connector.as_str() {
813 "cron" => Box::new(CronConnector::new()),
814 "webhook" => {
815 let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
816 .expect("catalog webhook signature variant must be valid");
817 Box::new(GenericWebhookConnector::with_profile(
818 WebhookProviderProfile::new(
819 ProviderId::from(provider.provider.clone()),
820 provider.schema_name.clone(),
821 variant,
822 ),
823 ))
824 }
825 _ => Box::new(PlaceholderConnector::from_metadata(provider)),
826 },
827 ProviderRuntimeMetadata::Placeholder => {
828 Box::new(PlaceholderConnector::from_metadata(provider))
829 }
830 }
831}
832
833struct PlaceholderConnector {
834 provider_id: ProviderId,
835 kinds: Vec<TriggerKind>,
836 schema_name: String,
837}
838
839impl PlaceholderConnector {
840 fn from_metadata(metadata: &ProviderMetadata) -> Self {
841 Self {
842 provider_id: ProviderId::from(metadata.provider.clone()),
843 kinds: metadata
844 .kinds
845 .iter()
846 .cloned()
847 .map(TriggerKind::from)
848 .collect(),
849 schema_name: metadata.schema_name.clone(),
850 }
851 }
852}
853
854struct PlaceholderClient;
855
856#[async_trait]
857impl ConnectorClient for PlaceholderClient {
858 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
859 Err(ClientError::Other(format!(
860 "connector client method '{method}' is not implemented for this provider"
861 )))
862 }
863}
864
865#[async_trait]
866impl Connector for PlaceholderConnector {
867 fn provider_id(&self) -> &ProviderId {
868 &self.provider_id
869 }
870
871 fn kinds(&self) -> &[TriggerKind] {
872 &self.kinds
873 }
874
875 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
876 Ok(())
877 }
878
879 async fn activate(
880 &self,
881 bindings: &[TriggerBinding],
882 ) -> Result<ActivationHandle, ConnectorError> {
883 Ok(ActivationHandle::new(
884 self.provider_id.clone(),
885 bindings.len(),
886 ))
887 }
888
889 fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
890 Err(ConnectorError::Unsupported(format!(
891 "provider '{}' is cataloged but does not have a concrete inbound connector yet",
892 self.provider_id.as_str()
893 )))
894 }
895
896 fn payload_schema(&self) -> ProviderPayloadSchema {
897 ProviderPayloadSchema::named(self.schema_name.clone())
898 }
899
900 fn client(&self) -> Arc<dyn ConnectorClient> {
901 Arc::new(PlaceholderClient)
902 }
903}
904
905pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
906 ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
907 *slot.borrow_mut() = clients
908 .into_iter()
909 .map(|(provider, client)| (provider.as_str().to_string(), client))
910 .collect();
911 });
912}
913
914pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
915 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
916}
917
918pub fn clear_active_connector_clients() {
919 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
920}
921
922#[cfg(test)]
923mod tests {
924 use super::*;
925
926 use std::sync::atomic::{AtomicUsize, Ordering};
927
928 use async_trait::async_trait;
929 use serde_json::json;
930
931 struct NoopClient;
932
933 #[async_trait]
934 impl ConnectorClient for NoopClient {
935 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
936 Ok(json!({ "method": method }))
937 }
938 }
939
940 struct FakeConnector {
941 provider_id: ProviderId,
942 kinds: Vec<TriggerKind>,
943 activate_calls: Arc<AtomicUsize>,
944 }
945
946 impl FakeConnector {
947 fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
948 Self {
949 provider_id: ProviderId::from(provider_id),
950 kinds: vec![TriggerKind::from("webhook")],
951 activate_calls,
952 }
953 }
954 }
955
956 #[async_trait]
957 impl Connector for FakeConnector {
958 fn provider_id(&self) -> &ProviderId {
959 &self.provider_id
960 }
961
962 fn kinds(&self) -> &[TriggerKind] {
963 &self.kinds
964 }
965
966 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
967 Ok(())
968 }
969
970 async fn activate(
971 &self,
972 bindings: &[TriggerBinding],
973 ) -> Result<ActivationHandle, ConnectorError> {
974 self.activate_calls.fetch_add(1, Ordering::SeqCst);
975 Ok(ActivationHandle::new(
976 self.provider_id.clone(),
977 bindings.len(),
978 ))
979 }
980
981 fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
982 Err(ConnectorError::Unsupported(
983 "not needed for registry tests".to_string(),
984 ))
985 }
986
987 fn payload_schema(&self) -> ProviderPayloadSchema {
988 ProviderPayloadSchema::named("FakePayload")
989 }
990
991 fn client(&self) -> Arc<dyn ConnectorClient> {
992 Arc::new(NoopClient)
993 }
994 }
995
996 #[tokio::test]
997 async fn connector_registry_rejects_duplicate_providers() {
998 let activate_calls = Arc::new(AtomicUsize::new(0));
999 let mut registry = ConnectorRegistry::empty();
1000 registry
1001 .register(Box::new(FakeConnector::new(
1002 "github",
1003 activate_calls.clone(),
1004 )))
1005 .unwrap();
1006
1007 let error = registry
1008 .register(Box::new(FakeConnector::new("github", activate_calls)))
1009 .unwrap_err();
1010 assert!(matches!(
1011 error,
1012 ConnectorError::DuplicateProvider(provider) if provider == "github"
1013 ));
1014 }
1015
1016 #[tokio::test]
1017 async fn connector_registry_activates_only_bound_connectors() {
1018 let github_calls = Arc::new(AtomicUsize::new(0));
1019 let slack_calls = Arc::new(AtomicUsize::new(0));
1020 let mut registry = ConnectorRegistry::empty();
1021 registry
1022 .register(Box::new(FakeConnector::new("github", github_calls.clone())))
1023 .unwrap();
1024 registry
1025 .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
1026 .unwrap();
1027
1028 let mut trigger_registry = TriggerRegistry::default();
1029 trigger_registry.register(TriggerBinding::new(
1030 ProviderId::from("github"),
1031 "webhook",
1032 "github.push",
1033 ));
1034 trigger_registry.register(TriggerBinding::new(
1035 ProviderId::from("github"),
1036 "webhook",
1037 "github.installation",
1038 ));
1039
1040 let handles = registry.activate_all(&trigger_registry).await.unwrap();
1041 assert_eq!(handles.len(), 1);
1042 assert_eq!(handles[0].provider.as_str(), "github");
1043 assert_eq!(handles[0].binding_count, 2);
1044 assert_eq!(github_calls.load(Ordering::SeqCst), 1);
1045 assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
1046 }
1047
1048 #[test]
1049 fn rate_limiter_scopes_tokens_by_provider_and_key() {
1050 let factory = RateLimiterFactory::new(RateLimitConfig {
1051 capacity: 1,
1052 refill_tokens: 1,
1053 refill_interval: StdDuration::from_secs(60),
1054 });
1055
1056 assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
1057 assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
1058 assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
1059 assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
1060 }
1061
1062 #[test]
1063 fn raw_inbound_json_body_preserves_raw_bytes() {
1064 let raw = RawInbound::new(
1065 "push",
1066 BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
1067 br#"{"ok":true}"#.to_vec(),
1068 );
1069
1070 assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
1071 }
1072
1073 #[test]
1074 fn connector_registry_lists_catalog_providers() {
1075 let registry = ConnectorRegistry::default();
1076 let providers = registry.list();
1077 assert!(providers.contains(&ProviderId::from("cron")));
1078 assert!(providers.contains(&ProviderId::from("github")));
1079 assert!(providers.contains(&ProviderId::from("webhook")));
1080 }
1081}