1use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25 registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26 ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod cron;
30pub mod github;
31pub mod harn_module;
32pub mod hmac;
33pub mod linear;
34pub mod notion;
35pub mod slack;
36#[cfg(test)]
37pub(crate) mod test_util;
38pub mod webhook;
39
40pub use cron::{CatchupMode, CronConnector};
41pub use github::GitHubConnector;
42pub use harn_module::{
43 load_contract as load_harn_connector_contract, HarnConnector, HarnConnectorContract,
44};
45pub use hmac::{
46 verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
47 DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
48 DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
49 DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
50 DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
51 DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
52 SIGNATURE_VERIFY_AUDIT_TOPIC,
53};
54pub use linear::LinearConnector;
55pub use notion::{
56 load_pending_webhook_handshakes, NotionConnector, PersistedNotionWebhookHandshake,
57};
58pub use slack::SlackConnector;
59use webhook::WebhookProviderProfile;
60pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
61
62pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
64
65thread_local! {
66 static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
67 RefCell::new(BTreeMap::new());
68}
69
70#[async_trait]
72pub trait Connector: Send + Sync {
73 fn provider_id(&self) -> &ProviderId;
75
76 fn kinds(&self) -> &[TriggerKind];
78
79 async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
81
82 async fn activate(
84 &self,
85 bindings: &[TriggerBinding],
86 ) -> Result<ActivationHandle, ConnectorError>;
87
88 async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
90 Ok(())
91 }
92
93 async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
95
96 fn payload_schema(&self) -> ProviderPayloadSchema;
98
99 fn client(&self) -> Arc<dyn ConnectorClient>;
101}
102
103#[derive(Clone, Debug, PartialEq)]
104pub enum PostNormalizeOutcome {
105 Ready(Box<TriggerEvent>),
106 DuplicateDropped,
107}
108
109pub async fn postprocess_normalized_event(
110 inbox: &InboxIndex,
111 binding_id: &str,
112 dedupe_enabled: bool,
113 dedupe_ttl: StdDuration,
114 mut event: TriggerEvent,
115) -> Result<PostNormalizeOutcome, ConnectorError> {
116 if dedupe_enabled && !event.dedupe_claimed() {
117 if !inbox
118 .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
119 .await?
120 {
121 return Ok(PostNormalizeOutcome::DuplicateDropped);
122 }
123 event.mark_dedupe_claimed();
124 }
125
126 Ok(PostNormalizeOutcome::Ready(Box::new(event)))
127}
128
129#[async_trait]
131pub trait ConnectorClient: Send + Sync {
132 async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
133}
134
135#[derive(Clone, Debug, PartialEq, Eq)]
137pub enum ClientError {
138 MethodNotFound(String),
139 InvalidArgs(String),
140 RateLimited(String),
141 Transport(String),
142 Other(String),
143}
144
145impl fmt::Display for ClientError {
146 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147 match self {
148 Self::MethodNotFound(message)
149 | Self::InvalidArgs(message)
150 | Self::RateLimited(message)
151 | Self::Transport(message)
152 | Self::Other(message) => message.fmt(f),
153 }
154 }
155}
156
157impl std::error::Error for ClientError {}
158
159#[derive(Debug)]
161pub enum ConnectorError {
162 DuplicateProvider(String),
163 DuplicateDelivery(String),
164 UnknownProvider(String),
165 MissingHeader(String),
166 InvalidHeader {
167 name: String,
168 detail: String,
169 },
170 InvalidSignature(String),
171 TimestampOutOfWindow {
172 timestamp: OffsetDateTime,
173 now: OffsetDateTime,
174 window: time::Duration,
175 },
176 Json(String),
177 Secret(String),
178 EventLog(String),
179 HarnRuntime(String),
180 Client(ClientError),
181 Unsupported(String),
182 Activation(String),
183}
184
185impl ConnectorError {
186 pub fn invalid_signature(message: impl Into<String>) -> Self {
187 Self::InvalidSignature(message.into())
188 }
189}
190
191impl fmt::Display for ConnectorError {
192 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
193 match self {
194 Self::DuplicateProvider(provider) => {
195 write!(f, "connector provider `{provider}` is already registered")
196 }
197 Self::DuplicateDelivery(message) => message.fmt(f),
198 Self::UnknownProvider(provider) => {
199 write!(f, "connector provider `{provider}` is not registered")
200 }
201 Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
202 Self::InvalidHeader { name, detail } => {
203 write!(f, "invalid header `{name}`: {detail}")
204 }
205 Self::InvalidSignature(message)
206 | Self::Json(message)
207 | Self::Secret(message)
208 | Self::EventLog(message)
209 | Self::HarnRuntime(message)
210 | Self::Unsupported(message)
211 | Self::Activation(message) => message.fmt(f),
212 Self::TimestampOutOfWindow {
213 timestamp,
214 now,
215 window,
216 } => write!(
217 f,
218 "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
219 ),
220 Self::Client(error) => error.fmt(f),
221 }
222 }
223}
224
225impl std::error::Error for ConnectorError {}
226
227impl From<crate::event_log::LogError> for ConnectorError {
228 fn from(value: crate::event_log::LogError) -> Self {
229 Self::EventLog(value.to_string())
230 }
231}
232
233impl From<crate::secrets::SecretError> for ConnectorError {
234 fn from(value: crate::secrets::SecretError) -> Self {
235 Self::Secret(value.to_string())
236 }
237}
238
239impl From<serde_json::Error> for ConnectorError {
240 fn from(value: serde_json::Error) -> Self {
241 Self::Json(value.to_string())
242 }
243}
244
245impl From<ClientError> for ConnectorError {
246 fn from(value: ClientError) -> Self {
247 Self::Client(value)
248 }
249}
250
251#[derive(Clone)]
253pub struct ConnectorCtx {
254 pub event_log: Arc<AnyEventLog>,
255 pub secrets: Arc<dyn SecretProvider>,
256 pub inbox: Arc<InboxIndex>,
257 pub metrics: Arc<MetricsRegistry>,
258 pub rate_limiter: Arc<RateLimiterFactory>,
259}
260
261#[derive(Clone, Debug, Default, PartialEq, Eq)]
263pub struct ConnectorMetricsSnapshot {
264 pub inbox_claims_written: u64,
265 pub inbox_duplicates_rejected: u64,
266 pub inbox_fast_path_hits: u64,
267 pub inbox_durable_hits: u64,
268 pub inbox_expired_entries: u64,
269 pub inbox_active_entries: u64,
270 pub linear_timestamp_rejections_total: u64,
271 pub dispatch_succeeded_total: u64,
272 pub dispatch_failed_total: u64,
273 pub retry_scheduled_total: u64,
274 pub slack_delivery_success_total: u64,
275 pub slack_delivery_failure_total: u64,
276}
277
278#[derive(Debug, Default)]
280pub struct MetricsRegistry {
281 inbox_claims_written: AtomicU64,
282 inbox_duplicates_rejected: AtomicU64,
283 inbox_fast_path_hits: AtomicU64,
284 inbox_durable_hits: AtomicU64,
285 inbox_expired_entries: AtomicU64,
286 inbox_active_entries: AtomicU64,
287 linear_timestamp_rejections_total: AtomicU64,
288 dispatch_succeeded_total: AtomicU64,
289 dispatch_failed_total: AtomicU64,
290 retry_scheduled_total: AtomicU64,
291 slack_delivery_success_total: AtomicU64,
292 slack_delivery_failure_total: AtomicU64,
293 custom_counters: Mutex<BTreeMap<String, u64>>,
294}
295
296impl MetricsRegistry {
297 pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
298 ConnectorMetricsSnapshot {
299 inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
300 inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
301 inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
302 inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
303 inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
304 inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
305 linear_timestamp_rejections_total: self
306 .linear_timestamp_rejections_total
307 .load(Ordering::Relaxed),
308 dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
309 dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
310 retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
311 slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
312 slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
313 }
314 }
315
316 pub(crate) fn record_inbox_claim(&self) {
317 self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
318 }
319
320 pub(crate) fn record_inbox_duplicate_fast_path(&self) {
321 self.inbox_duplicates_rejected
322 .fetch_add(1, Ordering::Relaxed);
323 self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
324 }
325
326 pub(crate) fn record_inbox_duplicate_durable(&self) {
327 self.inbox_duplicates_rejected
328 .fetch_add(1, Ordering::Relaxed);
329 self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
330 }
331
332 pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
333 if count > 0 {
334 self.inbox_expired_entries
335 .fetch_add(count, Ordering::Relaxed);
336 }
337 }
338
339 pub(crate) fn set_inbox_active_entries(&self, count: usize) {
340 self.inbox_active_entries
341 .store(count as u64, Ordering::Relaxed);
342 }
343
344 pub fn record_linear_timestamp_rejection(&self) {
345 self.linear_timestamp_rejections_total
346 .fetch_add(1, Ordering::Relaxed);
347 }
348
349 pub fn record_dispatch_succeeded(&self) {
350 self.dispatch_succeeded_total
351 .fetch_add(1, Ordering::Relaxed);
352 }
353
354 pub fn record_dispatch_failed(&self) {
355 self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
356 }
357
358 pub fn record_retry_scheduled(&self) {
359 self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
360 }
361
362 pub fn record_slack_delivery_success(&self) {
363 self.slack_delivery_success_total
364 .fetch_add(1, Ordering::Relaxed);
365 }
366
367 pub fn record_slack_delivery_failure(&self) {
368 self.slack_delivery_failure_total
369 .fetch_add(1, Ordering::Relaxed);
370 }
371
372 pub fn record_custom_counter(&self, name: &str, amount: u64) {
373 if amount == 0 {
374 return;
375 }
376 let mut counters = self
377 .custom_counters
378 .lock()
379 .expect("custom counters poisoned");
380 *counters.entry(name.to_string()).or_default() += amount;
381 }
382
383 pub fn render_prometheus(&self) -> String {
384 let snapshot = self.snapshot();
385 let counters = [
386 (
387 "connector_linear_timestamp_rejections_total",
388 snapshot.linear_timestamp_rejections_total,
389 ),
390 (
391 "dispatch_succeeded_total",
392 snapshot.dispatch_succeeded_total,
393 ),
394 ("dispatch_failed_total", snapshot.dispatch_failed_total),
395 ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
396 ("retry_scheduled_total", snapshot.retry_scheduled_total),
397 (
398 "slack_events_delivery_success_total",
399 snapshot.slack_delivery_success_total,
400 ),
401 (
402 "slack_events_delivery_failure_total",
403 snapshot.slack_delivery_failure_total,
404 ),
405 ];
406
407 let mut rendered = String::new();
408 for (name, value) in counters {
409 rendered.push_str("# TYPE ");
410 rendered.push_str(name);
411 rendered.push_str(" counter\n");
412 rendered.push_str(name);
413 rendered.push(' ');
414 rendered.push_str(&value.to_string());
415 rendered.push('\n');
416 }
417 let custom_counters = self
418 .custom_counters
419 .lock()
420 .expect("custom counters poisoned");
421 for (name, value) in custom_counters.iter() {
422 let metric_name = format!(
423 "connector_custom_{}_total",
424 name.chars()
425 .map(|ch| if ch.is_ascii_alphanumeric() || ch == '_' {
426 ch
427 } else {
428 '_'
429 })
430 .collect::<String>()
431 );
432 rendered.push_str("# TYPE ");
433 rendered.push_str(&metric_name);
434 rendered.push_str(" counter\n");
435 rendered.push_str(&metric_name);
436 rendered.push(' ');
437 rendered.push_str(&value.to_string());
438 rendered.push('\n');
439 }
440 rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
441 rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
442 rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
443 rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
444 rendered
445 }
446}
447
448#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
450pub struct ProviderPayloadSchema {
451 pub harn_schema_name: String,
452 #[serde(default)]
453 pub json_schema: JsonValue,
454}
455
456impl ProviderPayloadSchema {
457 pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
458 Self {
459 harn_schema_name: harn_schema_name.into(),
460 json_schema,
461 }
462 }
463
464 pub fn named(harn_schema_name: impl Into<String>) -> Self {
465 Self::new(harn_schema_name, JsonValue::Null)
466 }
467}
468
469impl Default for ProviderPayloadSchema {
470 fn default() -> Self {
471 Self::named("raw")
472 }
473}
474
475#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
477#[serde(transparent)]
478pub struct TriggerKind(String);
479
480impl TriggerKind {
481 pub fn new(value: impl Into<String>) -> Self {
482 Self(value.into())
483 }
484
485 pub fn as_str(&self) -> &str {
486 self.0.as_str()
487 }
488}
489
490impl From<&str> for TriggerKind {
491 fn from(value: &str) -> Self {
492 Self::new(value)
493 }
494}
495
496impl From<String> for TriggerKind {
497 fn from(value: String) -> Self {
498 Self::new(value)
499 }
500}
501
502#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
504pub struct TriggerBinding {
505 pub provider: ProviderId,
506 pub kind: TriggerKind,
507 pub binding_id: String,
508 #[serde(default)]
509 pub dedupe_key: Option<String>,
510 #[serde(default = "default_dedupe_retention_days")]
511 pub dedupe_retention_days: u32,
512 #[serde(default)]
513 pub config: JsonValue,
514}
515
516impl TriggerBinding {
517 pub fn new(
518 provider: ProviderId,
519 kind: impl Into<TriggerKind>,
520 binding_id: impl Into<String>,
521 ) -> Self {
522 Self {
523 provider,
524 kind: kind.into(),
525 binding_id: binding_id.into(),
526 dedupe_key: None,
527 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
528 config: JsonValue::Null,
529 }
530 }
531}
532
533fn default_dedupe_retention_days() -> u32 {
534 crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
535}
536
537#[derive(Clone, Debug, Default)]
539pub struct TriggerRegistry {
540 bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
541}
542
543impl TriggerRegistry {
544 pub fn register(&mut self, binding: TriggerBinding) {
545 self.bindings
546 .entry(binding.provider.clone())
547 .or_default()
548 .push(binding);
549 }
550
551 pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
552 &self.bindings
553 }
554
555 pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
556 self.bindings
557 .get(provider)
558 .map(Vec::as_slice)
559 .unwrap_or(&[])
560 }
561}
562
563#[derive(Clone, Debug, PartialEq, Eq)]
565pub struct ActivationHandle {
566 pub provider: ProviderId,
567 pub binding_count: usize,
568}
569
570impl ActivationHandle {
571 pub fn new(provider: ProviderId, binding_count: usize) -> Self {
572 Self {
573 provider,
574 binding_count,
575 }
576 }
577}
578
579#[derive(Clone, Debug, PartialEq)]
581pub struct RawInbound {
582 pub kind: String,
583 pub headers: BTreeMap<String, String>,
584 pub query: BTreeMap<String, String>,
585 pub body: Vec<u8>,
586 pub received_at: OffsetDateTime,
587 pub occurred_at: Option<OffsetDateTime>,
588 pub tenant_id: Option<TenantId>,
589 pub metadata: JsonValue,
590}
591
592impl RawInbound {
593 pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
594 Self {
595 kind: kind.into(),
596 headers,
597 query: BTreeMap::new(),
598 body,
599 received_at: clock::now_utc(),
600 occurred_at: None,
601 tenant_id: None,
602 metadata: JsonValue::Null,
603 }
604 }
605
606 pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
607 Ok(serde_json::from_slice(&self.body)?)
608 }
609}
610
611#[derive(Clone, Copy, Debug, PartialEq, Eq)]
613pub struct RateLimitConfig {
614 pub capacity: u32,
615 pub refill_tokens: u32,
616 pub refill_interval: StdDuration,
617}
618
619impl Default for RateLimitConfig {
620 fn default() -> Self {
621 Self {
622 capacity: 60,
623 refill_tokens: 1,
624 refill_interval: StdDuration::from_secs(1),
625 }
626 }
627}
628
629#[derive(Clone, Debug)]
630struct TokenBucket {
631 tokens: f64,
632 last_refill: ClockInstant,
633}
634
635impl TokenBucket {
636 fn full(config: RateLimitConfig) -> Self {
637 Self {
638 tokens: config.capacity as f64,
639 last_refill: clock::instant_now(),
640 }
641 }
642
643 fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
644 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
645 let rate = config.refill_tokens.max(1) as f64 / interval;
646 let elapsed = now.duration_since(self.last_refill).as_secs_f64();
647 self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
648 self.last_refill = now;
649 }
650
651 fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
652 self.refill(config, now);
653 if self.tokens >= 1.0 {
654 self.tokens -= 1.0;
655 true
656 } else {
657 false
658 }
659 }
660
661 fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
662 if self.tokens >= 1.0 {
663 return StdDuration::ZERO;
664 }
665 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
666 let rate = config.refill_tokens.max(1) as f64 / interval;
667 let missing = (1.0 - self.tokens).max(0.0);
668 StdDuration::from_secs_f64((missing / rate).max(0.001))
669 }
670}
671
672#[derive(Debug)]
674pub struct RateLimiterFactory {
675 config: RateLimitConfig,
676 buckets: Mutex<HashMap<(String, String), TokenBucket>>,
677}
678
679impl RateLimiterFactory {
680 pub fn new(config: RateLimitConfig) -> Self {
681 Self {
682 config,
683 buckets: Mutex::new(HashMap::new()),
684 }
685 }
686
687 pub fn config(&self) -> RateLimitConfig {
688 self.config
689 }
690
691 pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
692 ScopedRateLimiter {
693 factory: self,
694 provider: provider.clone(),
695 key: key.into(),
696 }
697 }
698
699 pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
700 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
701 let bucket = buckets
702 .entry((provider.as_str().to_string(), key.to_string()))
703 .or_insert_with(|| TokenBucket::full(self.config));
704 bucket.try_acquire(self.config, clock::instant_now())
705 }
706
707 pub async fn acquire(&self, provider: &ProviderId, key: &str) {
708 loop {
709 let wait = {
710 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
711 let bucket = buckets
712 .entry((provider.as_str().to_string(), key.to_string()))
713 .or_insert_with(|| TokenBucket::full(self.config));
714 if bucket.try_acquire(self.config, clock::instant_now()) {
715 return;
716 }
717 bucket.wait_duration(self.config)
718 };
719 tokio::time::sleep(wait).await;
720 }
721 }
722}
723
724impl Default for RateLimiterFactory {
725 fn default() -> Self {
726 Self::new(RateLimitConfig::default())
727 }
728}
729
730#[derive(Clone, Debug)]
732pub struct ScopedRateLimiter<'a> {
733 factory: &'a RateLimiterFactory,
734 provider: ProviderId,
735 key: String,
736}
737
738impl<'a> ScopedRateLimiter<'a> {
739 pub fn try_acquire(&self) -> bool {
740 self.factory.try_acquire(&self.provider, &self.key)
741 }
742
743 pub async fn acquire(&self) {
744 self.factory.acquire(&self.provider, &self.key).await;
745 }
746}
747
748pub struct ConnectorRegistry {
750 connectors: BTreeMap<ProviderId, ConnectorHandle>,
751}
752
753impl ConnectorRegistry {
754 pub fn empty() -> Self {
755 Self {
756 connectors: BTreeMap::new(),
757 }
758 }
759
760 pub fn with_defaults() -> Self {
761 let mut registry = Self::empty();
762 for provider in registered_provider_metadata() {
763 registry
764 .register(default_connector_for_provider(&provider))
765 .expect("default connector registration should not fail");
766 }
767 registry
768 }
769
770 pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
771 let provider = connector.provider_id().clone();
772 if self.connectors.contains_key(&provider) {
773 return Err(ConnectorError::DuplicateProvider(provider.0));
774 }
775 self.connectors
776 .insert(provider, Arc::new(AsyncMutex::new(connector)));
777 Ok(())
778 }
779
780 pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
781 self.connectors.get(id).cloned()
782 }
783
784 pub fn remove(&mut self, id: &ProviderId) -> Option<ConnectorHandle> {
785 self.connectors.remove(id)
786 }
787
788 pub fn list(&self) -> Vec<ProviderId> {
789 self.connectors.keys().cloned().collect()
790 }
791
792 pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
793 for connector in self.connectors.values() {
794 connector.lock().await.init(ctx.clone()).await?;
795 }
796 Ok(())
797 }
798
799 pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
800 let mut clients = BTreeMap::new();
801 for (provider, connector) in &self.connectors {
802 let client = connector.lock().await.client();
803 clients.insert(provider.clone(), client);
804 }
805 clients
806 }
807
808 pub async fn activate_all(
809 &self,
810 registry: &TriggerRegistry,
811 ) -> Result<Vec<ActivationHandle>, ConnectorError> {
812 let mut handles = Vec::new();
813 for (provider, connector) in &self.connectors {
814 let bindings = registry.bindings_for(provider);
815 if bindings.is_empty() {
816 continue;
817 }
818 let connector = connector.lock().await;
819 handles.push(connector.activate(bindings).await?);
820 }
821 Ok(handles)
822 }
823}
824
825impl Default for ConnectorRegistry {
826 fn default() -> Self {
827 Self::with_defaults()
828 }
829}
830
831fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
832 if provider.provider == "github" {
842 return Box::new(GitHubConnector::new());
843 }
844 if provider.provider == "linear" {
845 return Box::new(LinearConnector::new());
846 }
847 if provider.provider == "slack" {
848 return Box::new(SlackConnector::new());
849 }
850 if provider.provider == "notion" {
851 return Box::new(NotionConnector::new());
852 }
853 match &provider.runtime {
854 ProviderRuntimeMetadata::Builtin {
855 connector,
856 default_signature_variant,
857 } => match connector.as_str() {
858 "cron" => Box::new(CronConnector::new()),
859 "webhook" => {
860 let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
861 .expect("catalog webhook signature variant must be valid");
862 Box::new(GenericWebhookConnector::with_profile(
863 WebhookProviderProfile::new(
864 ProviderId::from(provider.provider.clone()),
865 provider.schema_name.clone(),
866 variant,
867 ),
868 ))
869 }
870 _ => Box::new(PlaceholderConnector::from_metadata(provider)),
871 },
872 ProviderRuntimeMetadata::Placeholder => {
873 Box::new(PlaceholderConnector::from_metadata(provider))
874 }
875 }
876}
877
878struct PlaceholderConnector {
879 provider_id: ProviderId,
880 kinds: Vec<TriggerKind>,
881 schema_name: String,
882}
883
884impl PlaceholderConnector {
885 fn from_metadata(metadata: &ProviderMetadata) -> Self {
886 Self {
887 provider_id: ProviderId::from(metadata.provider.clone()),
888 kinds: metadata
889 .kinds
890 .iter()
891 .cloned()
892 .map(TriggerKind::from)
893 .collect(),
894 schema_name: metadata.schema_name.clone(),
895 }
896 }
897}
898
899struct PlaceholderClient;
900
901#[async_trait]
902impl ConnectorClient for PlaceholderClient {
903 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
904 Err(ClientError::Other(format!(
905 "connector client method '{method}' is not implemented for this provider"
906 )))
907 }
908}
909
910#[async_trait]
911impl Connector for PlaceholderConnector {
912 fn provider_id(&self) -> &ProviderId {
913 &self.provider_id
914 }
915
916 fn kinds(&self) -> &[TriggerKind] {
917 &self.kinds
918 }
919
920 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
921 Ok(())
922 }
923
924 async fn activate(
925 &self,
926 bindings: &[TriggerBinding],
927 ) -> Result<ActivationHandle, ConnectorError> {
928 Ok(ActivationHandle::new(
929 self.provider_id.clone(),
930 bindings.len(),
931 ))
932 }
933
934 async fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
935 Err(ConnectorError::Unsupported(format!(
936 "provider '{}' is cataloged but does not have a concrete inbound connector yet",
937 self.provider_id.as_str()
938 )))
939 }
940
941 fn payload_schema(&self) -> ProviderPayloadSchema {
942 ProviderPayloadSchema::named(self.schema_name.clone())
943 }
944
945 fn client(&self) -> Arc<dyn ConnectorClient> {
946 Arc::new(PlaceholderClient)
947 }
948}
949
950pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
951 ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
952 *slot.borrow_mut() = clients
953 .into_iter()
954 .map(|(provider, client)| (provider.as_str().to_string(), client))
955 .collect();
956 });
957}
958
959pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
960 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
961}
962
963pub fn clear_active_connector_clients() {
964 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
965}
966
967#[cfg(test)]
968mod tests {
969 use super::*;
970
971 use std::sync::atomic::{AtomicUsize, Ordering};
972
973 use async_trait::async_trait;
974 use serde_json::json;
975
976 struct NoopClient;
977
978 #[async_trait]
979 impl ConnectorClient for NoopClient {
980 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
981 Ok(json!({ "method": method }))
982 }
983 }
984
985 struct FakeConnector {
986 provider_id: ProviderId,
987 kinds: Vec<TriggerKind>,
988 activate_calls: Arc<AtomicUsize>,
989 }
990
991 impl FakeConnector {
992 fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
993 Self {
994 provider_id: ProviderId::from(provider_id),
995 kinds: vec![TriggerKind::from("webhook")],
996 activate_calls,
997 }
998 }
999 }
1000
1001 #[async_trait]
1002 impl Connector for FakeConnector {
1003 fn provider_id(&self) -> &ProviderId {
1004 &self.provider_id
1005 }
1006
1007 fn kinds(&self) -> &[TriggerKind] {
1008 &self.kinds
1009 }
1010
1011 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1012 Ok(())
1013 }
1014
1015 async fn activate(
1016 &self,
1017 bindings: &[TriggerBinding],
1018 ) -> Result<ActivationHandle, ConnectorError> {
1019 self.activate_calls.fetch_add(1, Ordering::SeqCst);
1020 Ok(ActivationHandle::new(
1021 self.provider_id.clone(),
1022 bindings.len(),
1023 ))
1024 }
1025
1026 async fn normalize_inbound(
1027 &self,
1028 _raw: RawInbound,
1029 ) -> Result<TriggerEvent, ConnectorError> {
1030 Err(ConnectorError::Unsupported(
1031 "not needed for registry tests".to_string(),
1032 ))
1033 }
1034
1035 fn payload_schema(&self) -> ProviderPayloadSchema {
1036 ProviderPayloadSchema::named("FakePayload")
1037 }
1038
1039 fn client(&self) -> Arc<dyn ConnectorClient> {
1040 Arc::new(NoopClient)
1041 }
1042 }
1043
1044 #[tokio::test]
1045 async fn connector_registry_rejects_duplicate_providers() {
1046 let activate_calls = Arc::new(AtomicUsize::new(0));
1047 let mut registry = ConnectorRegistry::empty();
1048 registry
1049 .register(Box::new(FakeConnector::new(
1050 "github",
1051 activate_calls.clone(),
1052 )))
1053 .unwrap();
1054
1055 let error = registry
1056 .register(Box::new(FakeConnector::new("github", activate_calls)))
1057 .unwrap_err();
1058 assert!(matches!(
1059 error,
1060 ConnectorError::DuplicateProvider(provider) if provider == "github"
1061 ));
1062 }
1063
1064 #[tokio::test]
1065 async fn connector_registry_activates_only_bound_connectors() {
1066 let github_calls = Arc::new(AtomicUsize::new(0));
1067 let slack_calls = Arc::new(AtomicUsize::new(0));
1068 let mut registry = ConnectorRegistry::empty();
1069 registry
1070 .register(Box::new(FakeConnector::new("github", github_calls.clone())))
1071 .unwrap();
1072 registry
1073 .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
1074 .unwrap();
1075
1076 let mut trigger_registry = TriggerRegistry::default();
1077 trigger_registry.register(TriggerBinding::new(
1078 ProviderId::from("github"),
1079 "webhook",
1080 "github.push",
1081 ));
1082 trigger_registry.register(TriggerBinding::new(
1083 ProviderId::from("github"),
1084 "webhook",
1085 "github.installation",
1086 ));
1087
1088 let handles = registry.activate_all(&trigger_registry).await.unwrap();
1089 assert_eq!(handles.len(), 1);
1090 assert_eq!(handles[0].provider.as_str(), "github");
1091 assert_eq!(handles[0].binding_count, 2);
1092 assert_eq!(github_calls.load(Ordering::SeqCst), 1);
1093 assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
1094 }
1095
1096 #[test]
1097 fn rate_limiter_scopes_tokens_by_provider_and_key() {
1098 let factory = RateLimiterFactory::new(RateLimitConfig {
1099 capacity: 1,
1100 refill_tokens: 1,
1101 refill_interval: StdDuration::from_secs(60),
1102 });
1103
1104 assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
1105 assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
1106 assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
1107 assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
1108 }
1109
1110 #[test]
1111 fn raw_inbound_json_body_preserves_raw_bytes() {
1112 let raw = RawInbound::new(
1113 "push",
1114 BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
1115 br#"{"ok":true}"#.to_vec(),
1116 );
1117
1118 assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
1119 }
1120
1121 #[test]
1122 fn connector_registry_lists_catalog_providers() {
1123 let registry = ConnectorRegistry::default();
1124 let providers = registry.list();
1125 assert!(providers.contains(&ProviderId::from("cron")));
1126 assert!(providers.contains(&ProviderId::from("github")));
1127 assert!(providers.contains(&ProviderId::from("webhook")));
1128 }
1129}