1use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25 registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26 ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod cron;
30pub mod github;
31pub mod hmac;
32pub mod slack;
33#[cfg(test)]
34pub(crate) mod test_util;
35pub mod webhook;
36
37pub use cron::{CatchupMode, CronConnector};
38pub use github::GitHubConnector;
39pub use hmac::{
40 verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
41 DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER, DEFAULT_SLACK_SIGNATURE_HEADER,
42 DEFAULT_SLACK_TIMESTAMP_HEADER, DEFAULT_STANDARD_WEBHOOKS_ID_HEADER,
43 DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER, DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER,
44 DEFAULT_STRIPE_SIGNATURE_HEADER, SIGNATURE_VERIFY_AUDIT_TOPIC,
45};
46pub use slack::SlackConnector;
47use webhook::WebhookProviderProfile;
48pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
49
50pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
52
53thread_local! {
54 static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
55 RefCell::new(BTreeMap::new());
56}
57
58#[async_trait]
60pub trait Connector: Send + Sync {
61 fn provider_id(&self) -> &ProviderId;
63
64 fn kinds(&self) -> &[TriggerKind];
66
67 async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
69
70 async fn activate(
72 &self,
73 bindings: &[TriggerBinding],
74 ) -> Result<ActivationHandle, ConnectorError>;
75
76 async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
78 Ok(())
79 }
80
81 fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
83
84 fn payload_schema(&self) -> ProviderPayloadSchema;
86
87 fn client(&self) -> Arc<dyn ConnectorClient>;
89}
90
91#[derive(Clone, Debug, PartialEq)]
92pub enum PostNormalizeOutcome {
93 Ready(Box<TriggerEvent>),
94 DuplicateDropped,
95}
96
97pub async fn postprocess_normalized_event(
98 inbox: &InboxIndex,
99 binding_id: &str,
100 dedupe_enabled: bool,
101 dedupe_ttl: StdDuration,
102 mut event: TriggerEvent,
103) -> Result<PostNormalizeOutcome, ConnectorError> {
104 if dedupe_enabled && !event.dedupe_claimed() {
105 if !inbox
106 .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
107 .await?
108 {
109 return Ok(PostNormalizeOutcome::DuplicateDropped);
110 }
111 event.mark_dedupe_claimed();
112 }
113
114 Ok(PostNormalizeOutcome::Ready(Box::new(event)))
115}
116
117#[async_trait]
119pub trait ConnectorClient: Send + Sync {
120 async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
121}
122
123#[derive(Clone, Debug, PartialEq, Eq)]
125pub enum ClientError {
126 MethodNotFound(String),
127 InvalidArgs(String),
128 RateLimited(String),
129 Transport(String),
130 Other(String),
131}
132
133impl fmt::Display for ClientError {
134 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135 match self {
136 Self::MethodNotFound(message)
137 | Self::InvalidArgs(message)
138 | Self::RateLimited(message)
139 | Self::Transport(message)
140 | Self::Other(message) => message.fmt(f),
141 }
142 }
143}
144
145impl std::error::Error for ClientError {}
146
147#[derive(Debug)]
149pub enum ConnectorError {
150 DuplicateProvider(String),
151 DuplicateDelivery(String),
152 UnknownProvider(String),
153 MissingHeader(String),
154 InvalidHeader {
155 name: String,
156 detail: String,
157 },
158 InvalidSignature(String),
159 TimestampOutOfWindow {
160 timestamp: OffsetDateTime,
161 now: OffsetDateTime,
162 window: time::Duration,
163 },
164 Json(String),
165 Secret(String),
166 EventLog(String),
167 Client(ClientError),
168 Unsupported(String),
169 Activation(String),
170}
171
172impl ConnectorError {
173 pub fn invalid_signature(message: impl Into<String>) -> Self {
174 Self::InvalidSignature(message.into())
175 }
176}
177
178impl fmt::Display for ConnectorError {
179 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180 match self {
181 Self::DuplicateProvider(provider) => {
182 write!(f, "connector provider `{provider}` is already registered")
183 }
184 Self::DuplicateDelivery(message) => message.fmt(f),
185 Self::UnknownProvider(provider) => {
186 write!(f, "connector provider `{provider}` is not registered")
187 }
188 Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
189 Self::InvalidHeader { name, detail } => {
190 write!(f, "invalid header `{name}`: {detail}")
191 }
192 Self::InvalidSignature(message)
193 | Self::Json(message)
194 | Self::Secret(message)
195 | Self::EventLog(message)
196 | Self::Unsupported(message)
197 | Self::Activation(message) => message.fmt(f),
198 Self::TimestampOutOfWindow {
199 timestamp,
200 now,
201 window,
202 } => write!(
203 f,
204 "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
205 ),
206 Self::Client(error) => error.fmt(f),
207 }
208 }
209}
210
211impl std::error::Error for ConnectorError {}
212
213impl From<crate::event_log::LogError> for ConnectorError {
214 fn from(value: crate::event_log::LogError) -> Self {
215 Self::EventLog(value.to_string())
216 }
217}
218
219impl From<crate::secrets::SecretError> for ConnectorError {
220 fn from(value: crate::secrets::SecretError) -> Self {
221 Self::Secret(value.to_string())
222 }
223}
224
225impl From<serde_json::Error> for ConnectorError {
226 fn from(value: serde_json::Error) -> Self {
227 Self::Json(value.to_string())
228 }
229}
230
231impl From<ClientError> for ConnectorError {
232 fn from(value: ClientError) -> Self {
233 Self::Client(value)
234 }
235}
236
237#[derive(Clone)]
239pub struct ConnectorCtx {
240 pub event_log: Arc<AnyEventLog>,
241 pub secrets: Arc<dyn SecretProvider>,
242 pub inbox: Arc<InboxIndex>,
243 pub metrics: Arc<MetricsRegistry>,
244 pub rate_limiter: Arc<RateLimiterFactory>,
245}
246
247#[derive(Clone, Debug, Default, PartialEq, Eq)]
249pub struct ConnectorMetricsSnapshot {
250 pub inbox_claims_written: u64,
251 pub inbox_duplicates_rejected: u64,
252 pub inbox_fast_path_hits: u64,
253 pub inbox_durable_hits: u64,
254 pub inbox_expired_entries: u64,
255 pub inbox_active_entries: u64,
256 pub dispatch_succeeded_total: u64,
257 pub dispatch_failed_total: u64,
258 pub retry_scheduled_total: u64,
259}
260
261#[derive(Debug, Default)]
263pub struct MetricsRegistry {
264 inbox_claims_written: AtomicU64,
265 inbox_duplicates_rejected: AtomicU64,
266 inbox_fast_path_hits: AtomicU64,
267 inbox_durable_hits: AtomicU64,
268 inbox_expired_entries: AtomicU64,
269 inbox_active_entries: AtomicU64,
270 dispatch_succeeded_total: AtomicU64,
271 dispatch_failed_total: AtomicU64,
272 retry_scheduled_total: AtomicU64,
273}
274
275impl MetricsRegistry {
276 pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
277 ConnectorMetricsSnapshot {
278 inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
279 inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
280 inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
281 inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
282 inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
283 inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
284 dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
285 dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
286 retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
287 }
288 }
289
290 pub(crate) fn record_inbox_claim(&self) {
291 self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
292 }
293
294 pub(crate) fn record_inbox_duplicate_fast_path(&self) {
295 self.inbox_duplicates_rejected
296 .fetch_add(1, Ordering::Relaxed);
297 self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
298 }
299
300 pub(crate) fn record_inbox_duplicate_durable(&self) {
301 self.inbox_duplicates_rejected
302 .fetch_add(1, Ordering::Relaxed);
303 self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
304 }
305
306 pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
307 if count > 0 {
308 self.inbox_expired_entries
309 .fetch_add(count, Ordering::Relaxed);
310 }
311 }
312
313 pub(crate) fn set_inbox_active_entries(&self, count: usize) {
314 self.inbox_active_entries
315 .store(count as u64, Ordering::Relaxed);
316 }
317
318 pub fn record_dispatch_succeeded(&self) {
319 self.dispatch_succeeded_total
320 .fetch_add(1, Ordering::Relaxed);
321 }
322
323 pub fn record_dispatch_failed(&self) {
324 self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
325 }
326
327 pub fn record_retry_scheduled(&self) {
328 self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
329 }
330
331 pub fn render_prometheus(&self) -> String {
332 let snapshot = self.snapshot();
333 let counters = [
334 (
335 "dispatch_succeeded_total",
336 snapshot.dispatch_succeeded_total,
337 ),
338 ("dispatch_failed_total", snapshot.dispatch_failed_total),
339 ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
340 ("retry_scheduled_total", snapshot.retry_scheduled_total),
341 ];
342
343 let mut rendered = String::new();
344 for (name, value) in counters {
345 rendered.push_str("# TYPE ");
346 rendered.push_str(name);
347 rendered.push_str(" counter\n");
348 rendered.push_str(name);
349 rendered.push(' ');
350 rendered.push_str(&value.to_string());
351 rendered.push('\n');
352 }
353 rendered
354 }
355}
356
357#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
359pub struct ProviderPayloadSchema {
360 pub harn_schema_name: String,
361 #[serde(default)]
362 pub json_schema: JsonValue,
363}
364
365impl ProviderPayloadSchema {
366 pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
367 Self {
368 harn_schema_name: harn_schema_name.into(),
369 json_schema,
370 }
371 }
372
373 pub fn named(harn_schema_name: impl Into<String>) -> Self {
374 Self::new(harn_schema_name, JsonValue::Null)
375 }
376}
377
378impl Default for ProviderPayloadSchema {
379 fn default() -> Self {
380 Self::named("raw")
381 }
382}
383
384#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
386#[serde(transparent)]
387pub struct TriggerKind(String);
388
389impl TriggerKind {
390 pub fn new(value: impl Into<String>) -> Self {
391 Self(value.into())
392 }
393
394 pub fn as_str(&self) -> &str {
395 self.0.as_str()
396 }
397}
398
399impl From<&str> for TriggerKind {
400 fn from(value: &str) -> Self {
401 Self::new(value)
402 }
403}
404
405impl From<String> for TriggerKind {
406 fn from(value: String) -> Self {
407 Self::new(value)
408 }
409}
410
411#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
413pub struct TriggerBinding {
414 pub provider: ProviderId,
415 pub kind: TriggerKind,
416 pub binding_id: String,
417 #[serde(default)]
418 pub dedupe_key: Option<String>,
419 #[serde(default = "default_dedupe_retention_days")]
420 pub dedupe_retention_days: u32,
421 #[serde(default)]
422 pub config: JsonValue,
423}
424
425impl TriggerBinding {
426 pub fn new(
427 provider: ProviderId,
428 kind: impl Into<TriggerKind>,
429 binding_id: impl Into<String>,
430 ) -> Self {
431 Self {
432 provider,
433 kind: kind.into(),
434 binding_id: binding_id.into(),
435 dedupe_key: None,
436 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
437 config: JsonValue::Null,
438 }
439 }
440}
441
442fn default_dedupe_retention_days() -> u32 {
443 crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
444}
445
446#[derive(Clone, Debug, Default)]
448pub struct TriggerRegistry {
449 bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
450}
451
452impl TriggerRegistry {
453 pub fn register(&mut self, binding: TriggerBinding) {
454 self.bindings
455 .entry(binding.provider.clone())
456 .or_default()
457 .push(binding);
458 }
459
460 pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
461 &self.bindings
462 }
463
464 pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
465 self.bindings
466 .get(provider)
467 .map(Vec::as_slice)
468 .unwrap_or(&[])
469 }
470}
471
472#[derive(Clone, Debug, PartialEq, Eq)]
474pub struct ActivationHandle {
475 pub provider: ProviderId,
476 pub binding_count: usize,
477}
478
479impl ActivationHandle {
480 pub fn new(provider: ProviderId, binding_count: usize) -> Self {
481 Self {
482 provider,
483 binding_count,
484 }
485 }
486}
487
488#[derive(Clone, Debug, PartialEq)]
490pub struct RawInbound {
491 pub kind: String,
492 pub headers: BTreeMap<String, String>,
493 pub query: BTreeMap<String, String>,
494 pub body: Vec<u8>,
495 pub received_at: OffsetDateTime,
496 pub occurred_at: Option<OffsetDateTime>,
497 pub tenant_id: Option<TenantId>,
498 pub metadata: JsonValue,
499}
500
501impl RawInbound {
502 pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
503 Self {
504 kind: kind.into(),
505 headers,
506 query: BTreeMap::new(),
507 body,
508 received_at: clock::now_utc(),
509 occurred_at: None,
510 tenant_id: None,
511 metadata: JsonValue::Null,
512 }
513 }
514
515 pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
516 Ok(serde_json::from_slice(&self.body)?)
517 }
518}
519
520#[derive(Clone, Copy, Debug, PartialEq, Eq)]
522pub struct RateLimitConfig {
523 pub capacity: u32,
524 pub refill_tokens: u32,
525 pub refill_interval: StdDuration,
526}
527
528impl Default for RateLimitConfig {
529 fn default() -> Self {
530 Self {
531 capacity: 60,
532 refill_tokens: 1,
533 refill_interval: StdDuration::from_secs(1),
534 }
535 }
536}
537
538#[derive(Clone, Debug)]
539struct TokenBucket {
540 tokens: f64,
541 last_refill: ClockInstant,
542}
543
544impl TokenBucket {
545 fn full(config: RateLimitConfig) -> Self {
546 Self {
547 tokens: config.capacity as f64,
548 last_refill: clock::instant_now(),
549 }
550 }
551
552 fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
553 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
554 let rate = config.refill_tokens.max(1) as f64 / interval;
555 let elapsed = now.duration_since(self.last_refill).as_secs_f64();
556 self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
557 self.last_refill = now;
558 }
559
560 fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
561 self.refill(config, now);
562 if self.tokens >= 1.0 {
563 self.tokens -= 1.0;
564 true
565 } else {
566 false
567 }
568 }
569
570 fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
571 if self.tokens >= 1.0 {
572 return StdDuration::ZERO;
573 }
574 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
575 let rate = config.refill_tokens.max(1) as f64 / interval;
576 let missing = (1.0 - self.tokens).max(0.0);
577 StdDuration::from_secs_f64((missing / rate).max(0.001))
578 }
579}
580
581#[derive(Debug)]
583pub struct RateLimiterFactory {
584 config: RateLimitConfig,
585 buckets: Mutex<HashMap<(String, String), TokenBucket>>,
586}
587
588impl RateLimiterFactory {
589 pub fn new(config: RateLimitConfig) -> Self {
590 Self {
591 config,
592 buckets: Mutex::new(HashMap::new()),
593 }
594 }
595
596 pub fn config(&self) -> RateLimitConfig {
597 self.config
598 }
599
600 pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
601 ScopedRateLimiter {
602 factory: self,
603 provider: provider.clone(),
604 key: key.into(),
605 }
606 }
607
608 pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
609 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
610 let bucket = buckets
611 .entry((provider.as_str().to_string(), key.to_string()))
612 .or_insert_with(|| TokenBucket::full(self.config));
613 bucket.try_acquire(self.config, clock::instant_now())
614 }
615
616 pub async fn acquire(&self, provider: &ProviderId, key: &str) {
617 loop {
618 let wait = {
619 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
620 let bucket = buckets
621 .entry((provider.as_str().to_string(), key.to_string()))
622 .or_insert_with(|| TokenBucket::full(self.config));
623 if bucket.try_acquire(self.config, clock::instant_now()) {
624 return;
625 }
626 bucket.wait_duration(self.config)
627 };
628 tokio::time::sleep(wait).await;
629 }
630 }
631}
632
633impl Default for RateLimiterFactory {
634 fn default() -> Self {
635 Self::new(RateLimitConfig::default())
636 }
637}
638
639#[derive(Clone, Debug)]
641pub struct ScopedRateLimiter<'a> {
642 factory: &'a RateLimiterFactory,
643 provider: ProviderId,
644 key: String,
645}
646
647impl<'a> ScopedRateLimiter<'a> {
648 pub fn try_acquire(&self) -> bool {
649 self.factory.try_acquire(&self.provider, &self.key)
650 }
651
652 pub async fn acquire(&self) {
653 self.factory.acquire(&self.provider, &self.key).await;
654 }
655}
656
657pub struct ConnectorRegistry {
659 connectors: BTreeMap<ProviderId, ConnectorHandle>,
660}
661
662impl ConnectorRegistry {
663 pub fn empty() -> Self {
664 Self {
665 connectors: BTreeMap::new(),
666 }
667 }
668
669 pub fn with_defaults() -> Self {
670 let mut registry = Self::empty();
671 for provider in registered_provider_metadata() {
672 registry
673 .register(default_connector_for_provider(&provider))
674 .expect("default connector registration should not fail");
675 }
676 registry
677 }
678
679 pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
680 let provider = connector.provider_id().clone();
681 if self.connectors.contains_key(&provider) {
682 return Err(ConnectorError::DuplicateProvider(provider.0));
683 }
684 self.connectors
685 .insert(provider, Arc::new(AsyncMutex::new(connector)));
686 Ok(())
687 }
688
689 pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
690 self.connectors.get(id).cloned()
691 }
692
693 pub fn list(&self) -> Vec<ProviderId> {
694 self.connectors.keys().cloned().collect()
695 }
696
697 pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
698 for connector in self.connectors.values() {
699 connector.lock().await.init(ctx.clone()).await?;
700 }
701 Ok(())
702 }
703
704 pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
705 let mut clients = BTreeMap::new();
706 for (provider, connector) in &self.connectors {
707 let client = connector.lock().await.client();
708 clients.insert(provider.clone(), client);
709 }
710 clients
711 }
712
713 pub async fn activate_all(
714 &self,
715 registry: &TriggerRegistry,
716 ) -> Result<Vec<ActivationHandle>, ConnectorError> {
717 let mut handles = Vec::new();
718 for (provider, connector) in &self.connectors {
719 let bindings = registry.bindings_for(provider);
720 if bindings.is_empty() {
721 continue;
722 }
723 let connector = connector.lock().await;
724 handles.push(connector.activate(bindings).await?);
725 }
726 Ok(handles)
727 }
728}
729
730impl Default for ConnectorRegistry {
731 fn default() -> Self {
732 Self::with_defaults()
733 }
734}
735
736fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
737 if provider.provider == "github" {
747 return Box::new(GitHubConnector::new());
748 }
749 if provider.provider == "slack" {
750 return Box::new(SlackConnector::new());
751 }
752 match &provider.runtime {
753 ProviderRuntimeMetadata::Builtin {
754 connector,
755 default_signature_variant,
756 } => match connector.as_str() {
757 "cron" => Box::new(CronConnector::new()),
758 "webhook" => {
759 let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
760 .expect("catalog webhook signature variant must be valid");
761 Box::new(GenericWebhookConnector::with_profile(
762 WebhookProviderProfile::new(
763 ProviderId::from(provider.provider.clone()),
764 provider.schema_name.clone(),
765 variant,
766 ),
767 ))
768 }
769 _ => Box::new(PlaceholderConnector::from_metadata(provider)),
770 },
771 ProviderRuntimeMetadata::Placeholder => {
772 Box::new(PlaceholderConnector::from_metadata(provider))
773 }
774 }
775}
776
777struct PlaceholderConnector {
778 provider_id: ProviderId,
779 kinds: Vec<TriggerKind>,
780 schema_name: String,
781}
782
783impl PlaceholderConnector {
784 fn from_metadata(metadata: &ProviderMetadata) -> Self {
785 Self {
786 provider_id: ProviderId::from(metadata.provider.clone()),
787 kinds: metadata
788 .kinds
789 .iter()
790 .cloned()
791 .map(TriggerKind::from)
792 .collect(),
793 schema_name: metadata.schema_name.clone(),
794 }
795 }
796}
797
798struct PlaceholderClient;
799
800#[async_trait]
801impl ConnectorClient for PlaceholderClient {
802 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
803 Err(ClientError::Other(format!(
804 "connector client method '{method}' is not implemented for this provider"
805 )))
806 }
807}
808
809#[async_trait]
810impl Connector for PlaceholderConnector {
811 fn provider_id(&self) -> &ProviderId {
812 &self.provider_id
813 }
814
815 fn kinds(&self) -> &[TriggerKind] {
816 &self.kinds
817 }
818
819 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
820 Ok(())
821 }
822
823 async fn activate(
824 &self,
825 bindings: &[TriggerBinding],
826 ) -> Result<ActivationHandle, ConnectorError> {
827 Ok(ActivationHandle::new(
828 self.provider_id.clone(),
829 bindings.len(),
830 ))
831 }
832
833 fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
834 Err(ConnectorError::Unsupported(format!(
835 "provider '{}' is cataloged but does not have a concrete inbound connector yet",
836 self.provider_id.as_str()
837 )))
838 }
839
840 fn payload_schema(&self) -> ProviderPayloadSchema {
841 ProviderPayloadSchema::named(self.schema_name.clone())
842 }
843
844 fn client(&self) -> Arc<dyn ConnectorClient> {
845 Arc::new(PlaceholderClient)
846 }
847}
848
849pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
850 ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
851 *slot.borrow_mut() = clients
852 .into_iter()
853 .map(|(provider, client)| (provider.as_str().to_string(), client))
854 .collect();
855 });
856}
857
858pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
859 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
860}
861
862pub fn clear_active_connector_clients() {
863 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
864}
865
866#[cfg(test)]
867mod tests {
868 use super::*;
869
870 use std::sync::atomic::{AtomicUsize, Ordering};
871
872 use async_trait::async_trait;
873 use serde_json::json;
874
875 struct NoopClient;
876
877 #[async_trait]
878 impl ConnectorClient for NoopClient {
879 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
880 Ok(json!({ "method": method }))
881 }
882 }
883
884 struct FakeConnector {
885 provider_id: ProviderId,
886 kinds: Vec<TriggerKind>,
887 activate_calls: Arc<AtomicUsize>,
888 }
889
890 impl FakeConnector {
891 fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
892 Self {
893 provider_id: ProviderId::from(provider_id),
894 kinds: vec![TriggerKind::from("webhook")],
895 activate_calls,
896 }
897 }
898 }
899
900 #[async_trait]
901 impl Connector for FakeConnector {
902 fn provider_id(&self) -> &ProviderId {
903 &self.provider_id
904 }
905
906 fn kinds(&self) -> &[TriggerKind] {
907 &self.kinds
908 }
909
910 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
911 Ok(())
912 }
913
914 async fn activate(
915 &self,
916 bindings: &[TriggerBinding],
917 ) -> Result<ActivationHandle, ConnectorError> {
918 self.activate_calls.fetch_add(1, Ordering::SeqCst);
919 Ok(ActivationHandle::new(
920 self.provider_id.clone(),
921 bindings.len(),
922 ))
923 }
924
925 fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
926 Err(ConnectorError::Unsupported(
927 "not needed for registry tests".to_string(),
928 ))
929 }
930
931 fn payload_schema(&self) -> ProviderPayloadSchema {
932 ProviderPayloadSchema::named("FakePayload")
933 }
934
935 fn client(&self) -> Arc<dyn ConnectorClient> {
936 Arc::new(NoopClient)
937 }
938 }
939
940 #[tokio::test]
941 async fn connector_registry_rejects_duplicate_providers() {
942 let activate_calls = Arc::new(AtomicUsize::new(0));
943 let mut registry = ConnectorRegistry::empty();
944 registry
945 .register(Box::new(FakeConnector::new(
946 "github",
947 activate_calls.clone(),
948 )))
949 .unwrap();
950
951 let error = registry
952 .register(Box::new(FakeConnector::new("github", activate_calls)))
953 .unwrap_err();
954 assert!(matches!(
955 error,
956 ConnectorError::DuplicateProvider(provider) if provider == "github"
957 ));
958 }
959
960 #[tokio::test]
961 async fn connector_registry_activates_only_bound_connectors() {
962 let github_calls = Arc::new(AtomicUsize::new(0));
963 let slack_calls = Arc::new(AtomicUsize::new(0));
964 let mut registry = ConnectorRegistry::empty();
965 registry
966 .register(Box::new(FakeConnector::new("github", github_calls.clone())))
967 .unwrap();
968 registry
969 .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
970 .unwrap();
971
972 let mut trigger_registry = TriggerRegistry::default();
973 trigger_registry.register(TriggerBinding::new(
974 ProviderId::from("github"),
975 "webhook",
976 "github.push",
977 ));
978 trigger_registry.register(TriggerBinding::new(
979 ProviderId::from("github"),
980 "webhook",
981 "github.installation",
982 ));
983
984 let handles = registry.activate_all(&trigger_registry).await.unwrap();
985 assert_eq!(handles.len(), 1);
986 assert_eq!(handles[0].provider.as_str(), "github");
987 assert_eq!(handles[0].binding_count, 2);
988 assert_eq!(github_calls.load(Ordering::SeqCst), 1);
989 assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
990 }
991
992 #[test]
993 fn rate_limiter_scopes_tokens_by_provider_and_key() {
994 let factory = RateLimiterFactory::new(RateLimitConfig {
995 capacity: 1,
996 refill_tokens: 1,
997 refill_interval: StdDuration::from_secs(60),
998 });
999
1000 assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
1001 assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
1002 assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
1003 assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
1004 }
1005
1006 #[test]
1007 fn raw_inbound_json_body_preserves_raw_bytes() {
1008 let raw = RawInbound::new(
1009 "push",
1010 BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
1011 br#"{"ok":true}"#.to_vec(),
1012 );
1013
1014 assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
1015 }
1016
1017 #[test]
1018 fn connector_registry_lists_catalog_providers() {
1019 let registry = ConnectorRegistry::default();
1020 let providers = registry.list();
1021 assert!(providers.contains(&ProviderId::from("cron")));
1022 assert!(providers.contains(&ProviderId::from("github")));
1023 assert!(providers.contains(&ProviderId::from("webhook")));
1024 }
1025}