1use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex, OnceLock};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25 registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26 ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod a2a_push;
30pub mod cron;
31pub mod github;
32pub mod harn_module;
33pub mod hmac;
34pub mod linear;
35pub mod notion;
36pub mod slack;
37#[cfg(test)]
38pub(crate) mod test_util;
39pub mod webhook;
40
41pub use a2a_push::A2aPushConnector;
42pub use cron::{CatchupMode, CronConnector};
43pub use github::GitHubConnector;
44pub use harn_module::{
45 load_contract as load_harn_connector_contract, HarnConnector, HarnConnectorContract,
46};
47pub use hmac::{
48 verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
49 DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
50 DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
51 DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
52 DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
53 DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
54 SIGNATURE_VERIFY_AUDIT_TOPIC,
55};
56pub use linear::LinearConnector;
57pub use notion::{
58 load_pending_webhook_handshakes, NotionConnector, PersistedNotionWebhookHandshake,
59};
60pub use slack::SlackConnector;
61use webhook::WebhookProviderProfile;
62pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
63
64pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
66
67thread_local! {
68 static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
69 RefCell::new(BTreeMap::new());
70}
71
72#[async_trait]
74pub trait Connector: Send + Sync {
75 fn provider_id(&self) -> &ProviderId;
77
78 fn kinds(&self) -> &[TriggerKind];
80
81 async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
83
84 async fn activate(
86 &self,
87 bindings: &[TriggerBinding],
88 ) -> Result<ActivationHandle, ConnectorError>;
89
90 async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
92 Ok(())
93 }
94
95 async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
97
98 fn payload_schema(&self) -> ProviderPayloadSchema;
100
101 fn client(&self) -> Arc<dyn ConnectorClient>;
103}
104
105#[derive(Clone, Debug, PartialEq)]
106pub enum PostNormalizeOutcome {
107 Ready(Box<TriggerEvent>),
108 DuplicateDropped,
109}
110
111pub async fn postprocess_normalized_event(
112 inbox: &InboxIndex,
113 binding_id: &str,
114 dedupe_enabled: bool,
115 dedupe_ttl: StdDuration,
116 mut event: TriggerEvent,
117) -> Result<PostNormalizeOutcome, ConnectorError> {
118 if dedupe_enabled && !event.dedupe_claimed() {
119 if !inbox
120 .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
121 .await?
122 {
123 return Ok(PostNormalizeOutcome::DuplicateDropped);
124 }
125 event.mark_dedupe_claimed();
126 }
127
128 Ok(PostNormalizeOutcome::Ready(Box::new(event)))
129}
130
131#[async_trait]
133pub trait ConnectorClient: Send + Sync {
134 async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
135}
136
137#[derive(Clone, Debug, PartialEq, Eq)]
139pub enum ClientError {
140 MethodNotFound(String),
141 InvalidArgs(String),
142 RateLimited(String),
143 Transport(String),
144 Other(String),
145}
146
147impl fmt::Display for ClientError {
148 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149 match self {
150 Self::MethodNotFound(message)
151 | Self::InvalidArgs(message)
152 | Self::RateLimited(message)
153 | Self::Transport(message)
154 | Self::Other(message) => message.fmt(f),
155 }
156 }
157}
158
159impl std::error::Error for ClientError {}
160
161#[derive(Debug)]
163pub enum ConnectorError {
164 DuplicateProvider(String),
165 DuplicateDelivery(String),
166 UnknownProvider(String),
167 MissingHeader(String),
168 InvalidHeader {
169 name: String,
170 detail: String,
171 },
172 InvalidSignature(String),
173 TimestampOutOfWindow {
174 timestamp: OffsetDateTime,
175 now: OffsetDateTime,
176 window: time::Duration,
177 },
178 Json(String),
179 Secret(String),
180 EventLog(String),
181 HarnRuntime(String),
182 Client(ClientError),
183 Unsupported(String),
184 Activation(String),
185}
186
187impl ConnectorError {
188 pub fn invalid_signature(message: impl Into<String>) -> Self {
189 Self::InvalidSignature(message.into())
190 }
191}
192
193impl fmt::Display for ConnectorError {
194 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195 match self {
196 Self::DuplicateProvider(provider) => {
197 write!(f, "connector provider `{provider}` is already registered")
198 }
199 Self::DuplicateDelivery(message) => message.fmt(f),
200 Self::UnknownProvider(provider) => {
201 write!(f, "connector provider `{provider}` is not registered")
202 }
203 Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
204 Self::InvalidHeader { name, detail } => {
205 write!(f, "invalid header `{name}`: {detail}")
206 }
207 Self::InvalidSignature(message)
208 | Self::Json(message)
209 | Self::Secret(message)
210 | Self::EventLog(message)
211 | Self::HarnRuntime(message)
212 | Self::Unsupported(message)
213 | Self::Activation(message) => message.fmt(f),
214 Self::TimestampOutOfWindow {
215 timestamp,
216 now,
217 window,
218 } => write!(
219 f,
220 "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
221 ),
222 Self::Client(error) => error.fmt(f),
223 }
224 }
225}
226
227impl std::error::Error for ConnectorError {}
228
229impl From<crate::event_log::LogError> for ConnectorError {
230 fn from(value: crate::event_log::LogError) -> Self {
231 Self::EventLog(value.to_string())
232 }
233}
234
235impl From<crate::secrets::SecretError> for ConnectorError {
236 fn from(value: crate::secrets::SecretError) -> Self {
237 Self::Secret(value.to_string())
238 }
239}
240
241impl From<serde_json::Error> for ConnectorError {
242 fn from(value: serde_json::Error) -> Self {
243 Self::Json(value.to_string())
244 }
245}
246
247impl From<ClientError> for ConnectorError {
248 fn from(value: ClientError) -> Self {
249 Self::Client(value)
250 }
251}
252
253#[derive(Clone)]
255pub struct ConnectorCtx {
256 pub event_log: Arc<AnyEventLog>,
257 pub secrets: Arc<dyn SecretProvider>,
258 pub inbox: Arc<InboxIndex>,
259 pub metrics: Arc<MetricsRegistry>,
260 pub rate_limiter: Arc<RateLimiterFactory>,
261}
262
263#[derive(Clone, Debug, Default, PartialEq, Eq)]
265pub struct ConnectorMetricsSnapshot {
266 pub inbox_claims_written: u64,
267 pub inbox_duplicates_rejected: u64,
268 pub inbox_fast_path_hits: u64,
269 pub inbox_durable_hits: u64,
270 pub inbox_expired_entries: u64,
271 pub inbox_active_entries: u64,
272 pub linear_timestamp_rejections_total: u64,
273 pub dispatch_succeeded_total: u64,
274 pub dispatch_failed_total: u64,
275 pub retry_scheduled_total: u64,
276 pub slack_delivery_success_total: u64,
277 pub slack_delivery_failure_total: u64,
278}
279
280type MetricLabels = BTreeMap<String, String>;
281
282#[derive(Clone, Debug, Default, PartialEq)]
283struct HistogramMetric {
284 buckets: BTreeMap<String, u64>,
285 count: u64,
286 sum: f64,
287}
288
289static ACTIVE_METRICS_REGISTRY: OnceLock<Mutex<Option<Arc<MetricsRegistry>>>> = OnceLock::new();
290
291pub fn install_active_metrics_registry(metrics: Arc<MetricsRegistry>) {
292 let slot = ACTIVE_METRICS_REGISTRY.get_or_init(|| Mutex::new(None));
293 *slot.lock().expect("active metrics registry poisoned") = Some(metrics);
294}
295
296pub fn clear_active_metrics_registry() {
297 if let Some(slot) = ACTIVE_METRICS_REGISTRY.get() {
298 *slot.lock().expect("active metrics registry poisoned") = None;
299 }
300}
301
302pub fn active_metrics_registry() -> Option<Arc<MetricsRegistry>> {
303 ACTIVE_METRICS_REGISTRY.get().and_then(|slot| {
304 slot.lock()
305 .expect("active metrics registry poisoned")
306 .clone()
307 })
308}
309
310#[derive(Debug, Default)]
312pub struct MetricsRegistry {
313 inbox_claims_written: AtomicU64,
314 inbox_duplicates_rejected: AtomicU64,
315 inbox_fast_path_hits: AtomicU64,
316 inbox_durable_hits: AtomicU64,
317 inbox_expired_entries: AtomicU64,
318 inbox_active_entries: AtomicU64,
319 linear_timestamp_rejections_total: AtomicU64,
320 dispatch_succeeded_total: AtomicU64,
321 dispatch_failed_total: AtomicU64,
322 retry_scheduled_total: AtomicU64,
323 slack_delivery_success_total: AtomicU64,
324 slack_delivery_failure_total: AtomicU64,
325 custom_counters: Mutex<BTreeMap<String, u64>>,
326 counters: Mutex<BTreeMap<(String, MetricLabels), f64>>,
327 gauges: Mutex<BTreeMap<(String, MetricLabels), f64>>,
328 histograms: Mutex<BTreeMap<(String, MetricLabels), HistogramMetric>>,
329}
330
331impl MetricsRegistry {
332 const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0];
333 const SIZE_BUCKETS: [f64; 9] = [
334 128.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0, 10485760.0,
335 ];
336
337 pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
338 ConnectorMetricsSnapshot {
339 inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
340 inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
341 inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
342 inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
343 inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
344 inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
345 linear_timestamp_rejections_total: self
346 .linear_timestamp_rejections_total
347 .load(Ordering::Relaxed),
348 dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
349 dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
350 retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
351 slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
352 slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
353 }
354 }
355
356 pub(crate) fn record_inbox_claim(&self) {
357 self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
358 }
359
360 pub(crate) fn record_inbox_duplicate_fast_path(&self) {
361 self.inbox_duplicates_rejected
362 .fetch_add(1, Ordering::Relaxed);
363 self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
364 }
365
366 pub(crate) fn record_inbox_duplicate_durable(&self) {
367 self.inbox_duplicates_rejected
368 .fetch_add(1, Ordering::Relaxed);
369 self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
370 }
371
372 pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
373 if count > 0 {
374 self.inbox_expired_entries
375 .fetch_add(count, Ordering::Relaxed);
376 }
377 }
378
379 pub(crate) fn set_inbox_active_entries(&self, count: usize) {
380 self.inbox_active_entries
381 .store(count as u64, Ordering::Relaxed);
382 }
383
384 pub fn record_linear_timestamp_rejection(&self) {
385 self.linear_timestamp_rejections_total
386 .fetch_add(1, Ordering::Relaxed);
387 }
388
389 pub fn record_dispatch_succeeded(&self) {
390 self.dispatch_succeeded_total
391 .fetch_add(1, Ordering::Relaxed);
392 }
393
394 pub fn record_dispatch_failed(&self) {
395 self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
396 }
397
398 pub fn record_retry_scheduled(&self) {
399 self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
400 }
401
402 pub fn record_slack_delivery_success(&self) {
403 self.slack_delivery_success_total
404 .fetch_add(1, Ordering::Relaxed);
405 }
406
407 pub fn record_slack_delivery_failure(&self) {
408 self.slack_delivery_failure_total
409 .fetch_add(1, Ordering::Relaxed);
410 }
411
412 pub fn record_custom_counter(&self, name: &str, amount: u64) {
413 if amount == 0 {
414 return;
415 }
416 let mut counters = self
417 .custom_counters
418 .lock()
419 .expect("custom counters poisoned");
420 *counters.entry(name.to_string()).or_default() += amount;
421 }
422
423 pub fn record_http_request(
424 &self,
425 endpoint: &str,
426 method: &str,
427 status: u16,
428 duration: StdDuration,
429 body_size_bytes: usize,
430 ) {
431 self.increment_counter(
432 "harn_http_requests_total",
433 labels([
434 ("endpoint", endpoint),
435 ("method", method),
436 ("status", &status.to_string()),
437 ]),
438 1,
439 );
440 self.observe_histogram(
441 "harn_http_request_duration_seconds",
442 labels([("endpoint", endpoint)]),
443 duration.as_secs_f64(),
444 &Self::DURATION_BUCKETS,
445 );
446 self.observe_histogram(
447 "harn_http_body_size_bytes",
448 labels([("endpoint", endpoint)]),
449 body_size_bytes as f64,
450 &Self::SIZE_BUCKETS,
451 );
452 }
453
454 pub fn record_trigger_received(&self, trigger_id: &str, provider: &str) {
455 self.increment_counter(
456 "harn_trigger_received_total",
457 labels([("trigger_id", trigger_id), ("provider", provider)]),
458 1,
459 );
460 }
461
462 pub fn record_trigger_deduped(&self, trigger_id: &str, reason: &str) {
463 self.increment_counter(
464 "harn_trigger_deduped_total",
465 labels([("trigger_id", trigger_id), ("reason", reason)]),
466 1,
467 );
468 }
469
470 pub fn record_trigger_predicate_evaluation(
471 &self,
472 trigger_id: &str,
473 result: bool,
474 cost_usd: f64,
475 ) {
476 self.increment_counter(
477 "harn_trigger_predicate_evaluations_total",
478 labels([
479 ("trigger_id", trigger_id),
480 ("result", if result { "true" } else { "false" }),
481 ]),
482 1,
483 );
484 self.observe_histogram(
485 "harn_trigger_predicate_cost_usd",
486 labels([("trigger_id", trigger_id)]),
487 cost_usd.max(0.0),
488 &[0.0, 0.001, 0.01, 0.05, 0.1, 1.0],
489 );
490 }
491
492 pub fn record_trigger_dispatched(&self, trigger_id: &str, handler_kind: &str, outcome: &str) {
493 self.increment_counter(
494 "harn_trigger_dispatched_total",
495 labels([
496 ("trigger_id", trigger_id),
497 ("handler_kind", handler_kind),
498 ("outcome", outcome),
499 ]),
500 1,
501 );
502 }
503
504 pub fn record_trigger_retry(&self, trigger_id: &str, attempt: u32) {
505 self.increment_counter(
506 "harn_trigger_retries_total",
507 labels([
508 ("trigger_id", trigger_id),
509 ("attempt", &attempt.to_string()),
510 ]),
511 1,
512 );
513 }
514
515 pub fn record_trigger_dlq(&self, trigger_id: &str, reason: &str) {
516 self.increment_counter(
517 "harn_trigger_dlq_total",
518 labels([("trigger_id", trigger_id), ("reason", reason)]),
519 1,
520 );
521 }
522
523 pub fn set_trigger_inflight(&self, trigger_id: &str, count: u64) {
524 self.set_gauge(
525 "harn_trigger_inflight",
526 labels([("trigger_id", trigger_id)]),
527 count as f64,
528 );
529 }
530
531 pub fn set_trigger_budget_cost_today(&self, trigger_id: &str, cost_usd: f64) {
532 self.set_gauge(
533 "harn_trigger_budget_cost_today_usd",
534 labels([("trigger_id", trigger_id)]),
535 cost_usd.max(0.0),
536 );
537 }
538
539 pub fn record_trigger_budget_exhausted(&self, trigger_id: &str, strategy: &str) {
540 self.increment_counter(
541 "harn_trigger_budget_exhausted_total",
542 labels([("trigger_id", trigger_id), ("strategy", strategy)]),
543 1,
544 );
545 }
546
547 pub fn record_event_log_append(
548 &self,
549 topic: &str,
550 duration: StdDuration,
551 payload_bytes: usize,
552 ) {
553 self.observe_histogram(
554 "harn_event_log_append_duration_seconds",
555 labels([("topic", topic)]),
556 duration.as_secs_f64(),
557 &Self::DURATION_BUCKETS,
558 );
559 self.set_gauge(
560 "harn_event_log_topic_size_bytes",
561 labels([("topic", topic)]),
562 payload_bytes as f64,
563 );
564 }
565
566 pub fn set_event_log_consumer_lag(&self, topic: &str, consumer: &str, lag: u64) {
567 self.set_gauge(
568 "harn_event_log_consumer_lag",
569 labels([("topic", topic), ("consumer", consumer)]),
570 lag as f64,
571 );
572 }
573
574 pub fn record_a2a_hop(&self, target: &str, outcome: &str, duration: StdDuration) {
575 self.increment_counter(
576 "harn_a2a_hops_total",
577 labels([("target", target), ("outcome", outcome)]),
578 1,
579 );
580 self.observe_histogram(
581 "harn_a2a_hop_duration_seconds",
582 labels([("target", target)]),
583 duration.as_secs_f64(),
584 &Self::DURATION_BUCKETS,
585 );
586 }
587
588 pub fn set_worker_queue_depth(&self, queue: &str, depth: u64) {
589 self.set_gauge(
590 "harn_worker_queue_depth",
591 labels([("queue", queue)]),
592 depth as f64,
593 );
594 }
595
596 pub fn record_worker_queue_claim_age(&self, queue: &str, age_seconds: f64) {
597 self.observe_histogram(
598 "harn_worker_queue_claim_age_seconds",
599 labels([("queue", queue)]),
600 age_seconds.max(0.0),
601 &Self::DURATION_BUCKETS,
602 );
603 }
604
605 pub fn record_llm_call(&self, provider: &str, model: &str, outcome: &str, cost_usd: f64) {
606 self.increment_counter(
607 "harn_llm_calls_total",
608 labels([
609 ("provider", provider),
610 ("model", model),
611 ("outcome", outcome),
612 ]),
613 1,
614 );
615 if cost_usd > 0.0 {
616 self.increment_counter(
617 "harn_llm_cost_usd_total",
618 labels([("provider", provider), ("model", model)]),
619 cost_usd,
620 );
621 } else {
622 self.ensure_counter(
623 "harn_llm_cost_usd_total",
624 labels([("provider", provider), ("model", model)]),
625 );
626 }
627 }
628
629 pub fn record_llm_cache_hit(&self, provider: &str) {
630 self.increment_counter(
631 "harn_llm_cache_hits_total",
632 labels([("provider", provider)]),
633 1,
634 );
635 }
636
637 pub fn render_prometheus(&self) -> String {
638 let snapshot = self.snapshot();
639 let counters = [
640 (
641 "connector_linear_timestamp_rejections_total",
642 snapshot.linear_timestamp_rejections_total,
643 ),
644 (
645 "dispatch_succeeded_total",
646 snapshot.dispatch_succeeded_total,
647 ),
648 ("dispatch_failed_total", snapshot.dispatch_failed_total),
649 ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
650 ("retry_scheduled_total", snapshot.retry_scheduled_total),
651 (
652 "slack_events_delivery_success_total",
653 snapshot.slack_delivery_success_total,
654 ),
655 (
656 "slack_events_delivery_failure_total",
657 snapshot.slack_delivery_failure_total,
658 ),
659 ];
660
661 let mut rendered = String::new();
662 for (name, value) in counters {
663 rendered.push_str("# TYPE ");
664 rendered.push_str(name);
665 rendered.push_str(" counter\n");
666 rendered.push_str(name);
667 rendered.push(' ');
668 rendered.push_str(&value.to_string());
669 rendered.push('\n');
670 }
671 let custom_counters = self
672 .custom_counters
673 .lock()
674 .expect("custom counters poisoned");
675 for (name, value) in custom_counters.iter() {
676 let metric_name = format!(
677 "connector_custom_{}_total",
678 name.chars()
679 .map(|ch| if ch.is_ascii_alphanumeric() || ch == '_' {
680 ch
681 } else {
682 '_'
683 })
684 .collect::<String>()
685 );
686 rendered.push_str("# TYPE ");
687 rendered.push_str(&metric_name);
688 rendered.push_str(" counter\n");
689 rendered.push_str(&metric_name);
690 rendered.push(' ');
691 rendered.push_str(&value.to_string());
692 rendered.push('\n');
693 }
694 rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
695 rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
696 rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
697 rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
698 self.render_generic_metrics(&mut rendered);
699 rendered
700 }
701
702 fn increment_counter(&self, name: &str, labels: MetricLabels, amount: impl Into<f64>) {
703 let amount = amount.into();
704 if amount <= 0.0 || !amount.is_finite() {
705 return;
706 }
707 let mut counters = self.counters.lock().expect("metrics counters poisoned");
708 *counters.entry((name.to_string(), labels)).or_default() += amount;
709 }
710
711 fn ensure_counter(&self, name: &str, labels: MetricLabels) {
712 let mut counters = self.counters.lock().expect("metrics counters poisoned");
713 counters.entry((name.to_string(), labels)).or_default();
714 }
715
716 fn set_gauge(&self, name: &str, labels: MetricLabels, value: f64) {
717 let mut gauges = self.gauges.lock().expect("metrics gauges poisoned");
718 gauges.insert((name.to_string(), labels), value);
719 }
720
721 fn observe_histogram(
722 &self,
723 name: &str,
724 labels: MetricLabels,
725 value: f64,
726 bucket_bounds: &[f64],
727 ) {
728 if !value.is_finite() {
729 return;
730 }
731 let mut histograms = self.histograms.lock().expect("metrics histograms poisoned");
732 let histogram = histograms
733 .entry((name.to_string(), labels))
734 .or_insert_with(|| HistogramMetric {
735 buckets: bucket_bounds
736 .iter()
737 .map(|bound| (prometheus_float(*bound), 0))
738 .chain(std::iter::once(("+Inf".to_string(), 0)))
739 .collect(),
740 count: 0,
741 sum: 0.0,
742 });
743 histogram.count += 1;
744 histogram.sum += value;
745 for bound in bucket_bounds {
746 if value <= *bound {
747 let key = prometheus_float(*bound);
748 *histogram.buckets.entry(key).or_default() += 1;
749 }
750 }
751 *histogram.buckets.entry("+Inf".to_string()).or_default() += 1;
752 }
753
754 fn render_generic_metrics(&self, rendered: &mut String) {
755 let counters = self
756 .counters
757 .lock()
758 .expect("metrics counters poisoned")
759 .clone();
760 let gauges = self.gauges.lock().expect("metrics gauges poisoned").clone();
761 let histograms = self
762 .histograms
763 .lock()
764 .expect("metrics histograms poisoned")
765 .clone();
766
767 for name in metric_family_names(MetricKind::Counter) {
768 rendered.push_str("# TYPE ");
769 rendered.push_str(name);
770 rendered.push_str(" counter\n");
771 for ((sample_name, labels), value) in counters.iter().filter(|((n, _), _)| n == name) {
772 render_sample(rendered, sample_name, labels, *value);
773 }
774 }
775 for name in metric_family_names(MetricKind::Gauge) {
776 rendered.push_str("# TYPE ");
777 rendered.push_str(name);
778 rendered.push_str(" gauge\n");
779 for ((sample_name, labels), value) in gauges.iter().filter(|((n, _), _)| n == name) {
780 render_sample(rendered, sample_name, labels, *value);
781 }
782 }
783 for name in metric_family_names(MetricKind::Histogram) {
784 rendered.push_str("# TYPE ");
785 rendered.push_str(name);
786 rendered.push_str(" histogram\n");
787 for ((sample_name, labels), histogram) in
788 histograms.iter().filter(|((n, _), _)| n == name)
789 {
790 for (le, value) in &histogram.buckets {
791 let mut bucket_labels = labels.clone();
792 bucket_labels.insert("le".to_string(), le.clone());
793 render_sample(
794 rendered,
795 &format!("{sample_name}_bucket"),
796 &bucket_labels,
797 *value as f64,
798 );
799 }
800 render_sample(
801 rendered,
802 &format!("{sample_name}_sum"),
803 labels,
804 histogram.sum,
805 );
806 render_sample(
807 rendered,
808 &format!("{sample_name}_count"),
809 labels,
810 histogram.count as f64,
811 );
812 }
813 }
814 }
815}
816
817#[derive(Clone, Copy)]
818enum MetricKind {
819 Counter,
820 Gauge,
821 Histogram,
822}
823
824fn metric_family_names(kind: MetricKind) -> &'static [&'static str] {
825 match kind {
826 MetricKind::Counter => &[
827 "harn_http_requests_total",
828 "harn_trigger_received_total",
829 "harn_trigger_deduped_total",
830 "harn_trigger_predicate_evaluations_total",
831 "harn_trigger_dispatched_total",
832 "harn_trigger_retries_total",
833 "harn_trigger_dlq_total",
834 "harn_trigger_budget_exhausted_total",
835 "harn_a2a_hops_total",
836 "harn_llm_calls_total",
837 "harn_llm_cost_usd_total",
838 "harn_llm_cache_hits_total",
839 ],
840 MetricKind::Gauge => &[
841 "harn_trigger_inflight",
842 "harn_event_log_topic_size_bytes",
843 "harn_event_log_consumer_lag",
844 "harn_trigger_budget_cost_today_usd",
845 "harn_worker_queue_depth",
846 ],
847 MetricKind::Histogram => &[
848 "harn_http_request_duration_seconds",
849 "harn_http_body_size_bytes",
850 "harn_trigger_predicate_cost_usd",
851 "harn_event_log_append_duration_seconds",
852 "harn_a2a_hop_duration_seconds",
853 "harn_worker_queue_claim_age_seconds",
854 ],
855 }
856}
857
858fn labels<const N: usize>(pairs: [(&str, &str); N]) -> MetricLabels {
859 pairs
860 .into_iter()
861 .map(|(name, value)| (name.to_string(), value.to_string()))
862 .collect()
863}
864
865fn render_sample(rendered: &mut String, name: &str, labels: &MetricLabels, value: f64) {
866 rendered.push_str(name);
867 if !labels.is_empty() {
868 rendered.push('{');
869 for (index, (label, label_value)) in labels.iter().enumerate() {
870 if index > 0 {
871 rendered.push(',');
872 }
873 rendered.push_str(label);
874 rendered.push_str("=\"");
875 rendered.push_str(&escape_label_value(label_value));
876 rendered.push('"');
877 }
878 rendered.push('}');
879 }
880 rendered.push(' ');
881 rendered.push_str(&prometheus_float(value));
882 rendered.push('\n');
883}
884
885fn escape_label_value(value: &str) -> String {
886 value
887 .chars()
888 .flat_map(|ch| match ch {
889 '\\' => "\\\\".chars().collect::<Vec<_>>(),
890 '"' => "\\\"".chars().collect::<Vec<_>>(),
891 '\n' => "\\n".chars().collect::<Vec<_>>(),
892 other => vec![other],
893 })
894 .collect()
895}
896
897fn prometheus_float(value: f64) -> String {
898 if value.is_infinite() && value.is_sign_positive() {
899 return "+Inf".to_string();
900 }
901 if value.fract() == 0.0 {
902 format!("{value:.0}")
903 } else {
904 let rendered = format!("{value:.6}");
905 rendered
906 .trim_end_matches('0')
907 .trim_end_matches('.')
908 .to_string()
909 }
910}
911
912#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
914pub struct ProviderPayloadSchema {
915 pub harn_schema_name: String,
916 #[serde(default)]
917 pub json_schema: JsonValue,
918}
919
920impl ProviderPayloadSchema {
921 pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
922 Self {
923 harn_schema_name: harn_schema_name.into(),
924 json_schema,
925 }
926 }
927
928 pub fn named(harn_schema_name: impl Into<String>) -> Self {
929 Self::new(harn_schema_name, JsonValue::Null)
930 }
931}
932
933impl Default for ProviderPayloadSchema {
934 fn default() -> Self {
935 Self::named("raw")
936 }
937}
938
939#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
941#[serde(transparent)]
942pub struct TriggerKind(String);
943
944impl TriggerKind {
945 pub fn new(value: impl Into<String>) -> Self {
946 Self(value.into())
947 }
948
949 pub fn as_str(&self) -> &str {
950 self.0.as_str()
951 }
952}
953
954impl From<&str> for TriggerKind {
955 fn from(value: &str) -> Self {
956 Self::new(value)
957 }
958}
959
960impl From<String> for TriggerKind {
961 fn from(value: String) -> Self {
962 Self::new(value)
963 }
964}
965
966#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
968pub struct TriggerBinding {
969 pub provider: ProviderId,
970 pub kind: TriggerKind,
971 pub binding_id: String,
972 #[serde(default)]
973 pub dedupe_key: Option<String>,
974 #[serde(default = "default_dedupe_retention_days")]
975 pub dedupe_retention_days: u32,
976 #[serde(default)]
977 pub config: JsonValue,
978}
979
980impl TriggerBinding {
981 pub fn new(
982 provider: ProviderId,
983 kind: impl Into<TriggerKind>,
984 binding_id: impl Into<String>,
985 ) -> Self {
986 Self {
987 provider,
988 kind: kind.into(),
989 binding_id: binding_id.into(),
990 dedupe_key: None,
991 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
992 config: JsonValue::Null,
993 }
994 }
995}
996
997fn default_dedupe_retention_days() -> u32 {
998 crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
999}
1000
1001#[derive(Clone, Debug, Default)]
1003pub struct TriggerRegistry {
1004 bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
1005}
1006
1007impl TriggerRegistry {
1008 pub fn register(&mut self, binding: TriggerBinding) {
1009 self.bindings
1010 .entry(binding.provider.clone())
1011 .or_default()
1012 .push(binding);
1013 }
1014
1015 pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
1016 &self.bindings
1017 }
1018
1019 pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
1020 self.bindings
1021 .get(provider)
1022 .map(Vec::as_slice)
1023 .unwrap_or(&[])
1024 }
1025}
1026
1027#[derive(Clone, Debug, PartialEq, Eq)]
1029pub struct ActivationHandle {
1030 pub provider: ProviderId,
1031 pub binding_count: usize,
1032}
1033
1034impl ActivationHandle {
1035 pub fn new(provider: ProviderId, binding_count: usize) -> Self {
1036 Self {
1037 provider,
1038 binding_count,
1039 }
1040 }
1041}
1042
1043#[derive(Clone, Debug, PartialEq)]
1045pub struct RawInbound {
1046 pub kind: String,
1047 pub headers: BTreeMap<String, String>,
1048 pub query: BTreeMap<String, String>,
1049 pub body: Vec<u8>,
1050 pub received_at: OffsetDateTime,
1051 pub occurred_at: Option<OffsetDateTime>,
1052 pub tenant_id: Option<TenantId>,
1053 pub metadata: JsonValue,
1054}
1055
1056impl RawInbound {
1057 pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
1058 Self {
1059 kind: kind.into(),
1060 headers,
1061 query: BTreeMap::new(),
1062 body,
1063 received_at: clock::now_utc(),
1064 occurred_at: None,
1065 tenant_id: None,
1066 metadata: JsonValue::Null,
1067 }
1068 }
1069
1070 pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
1071 Ok(serde_json::from_slice(&self.body)?)
1072 }
1073}
1074
1075#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1077pub struct RateLimitConfig {
1078 pub capacity: u32,
1079 pub refill_tokens: u32,
1080 pub refill_interval: StdDuration,
1081}
1082
1083impl Default for RateLimitConfig {
1084 fn default() -> Self {
1085 Self {
1086 capacity: 60,
1087 refill_tokens: 1,
1088 refill_interval: StdDuration::from_secs(1),
1089 }
1090 }
1091}
1092
1093#[derive(Clone, Debug)]
1094struct TokenBucket {
1095 tokens: f64,
1096 last_refill: ClockInstant,
1097}
1098
1099impl TokenBucket {
1100 fn full(config: RateLimitConfig) -> Self {
1101 Self {
1102 tokens: config.capacity as f64,
1103 last_refill: clock::instant_now(),
1104 }
1105 }
1106
1107 fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
1108 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1109 let rate = config.refill_tokens.max(1) as f64 / interval;
1110 let elapsed = now.duration_since(self.last_refill).as_secs_f64();
1111 self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
1112 self.last_refill = now;
1113 }
1114
1115 fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
1116 self.refill(config, now);
1117 if self.tokens >= 1.0 {
1118 self.tokens -= 1.0;
1119 true
1120 } else {
1121 false
1122 }
1123 }
1124
1125 fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
1126 if self.tokens >= 1.0 {
1127 return StdDuration::ZERO;
1128 }
1129 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1130 let rate = config.refill_tokens.max(1) as f64 / interval;
1131 let missing = (1.0 - self.tokens).max(0.0);
1132 StdDuration::from_secs_f64((missing / rate).max(0.001))
1133 }
1134}
1135
1136#[derive(Debug)]
1138pub struct RateLimiterFactory {
1139 config: RateLimitConfig,
1140 buckets: Mutex<HashMap<(String, String), TokenBucket>>,
1141}
1142
1143impl RateLimiterFactory {
1144 pub fn new(config: RateLimitConfig) -> Self {
1145 Self {
1146 config,
1147 buckets: Mutex::new(HashMap::new()),
1148 }
1149 }
1150
1151 pub fn config(&self) -> RateLimitConfig {
1152 self.config
1153 }
1154
1155 pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
1156 ScopedRateLimiter {
1157 factory: self,
1158 provider: provider.clone(),
1159 key: key.into(),
1160 }
1161 }
1162
1163 pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
1164 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1165 let bucket = buckets
1166 .entry((provider.as_str().to_string(), key.to_string()))
1167 .or_insert_with(|| TokenBucket::full(self.config));
1168 bucket.try_acquire(self.config, clock::instant_now())
1169 }
1170
1171 pub async fn acquire(&self, provider: &ProviderId, key: &str) {
1172 loop {
1173 let wait = {
1174 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1175 let bucket = buckets
1176 .entry((provider.as_str().to_string(), key.to_string()))
1177 .or_insert_with(|| TokenBucket::full(self.config));
1178 if bucket.try_acquire(self.config, clock::instant_now()) {
1179 return;
1180 }
1181 bucket.wait_duration(self.config)
1182 };
1183 tokio::time::sleep(wait).await;
1184 }
1185 }
1186}
1187
1188impl Default for RateLimiterFactory {
1189 fn default() -> Self {
1190 Self::new(RateLimitConfig::default())
1191 }
1192}
1193
1194#[derive(Clone, Debug)]
1196pub struct ScopedRateLimiter<'a> {
1197 factory: &'a RateLimiterFactory,
1198 provider: ProviderId,
1199 key: String,
1200}
1201
1202impl<'a> ScopedRateLimiter<'a> {
1203 pub fn try_acquire(&self) -> bool {
1204 self.factory.try_acquire(&self.provider, &self.key)
1205 }
1206
1207 pub async fn acquire(&self) {
1208 self.factory.acquire(&self.provider, &self.key).await;
1209 }
1210}
1211
1212pub struct ConnectorRegistry {
1214 connectors: BTreeMap<ProviderId, ConnectorHandle>,
1215}
1216
1217impl ConnectorRegistry {
1218 pub fn empty() -> Self {
1219 Self {
1220 connectors: BTreeMap::new(),
1221 }
1222 }
1223
1224 pub fn with_defaults() -> Self {
1225 let mut registry = Self::empty();
1226 for provider in registered_provider_metadata() {
1227 registry
1228 .register(default_connector_for_provider(&provider))
1229 .expect("default connector registration should not fail");
1230 }
1231 registry
1232 }
1233
1234 pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
1235 let provider = connector.provider_id().clone();
1236 if self.connectors.contains_key(&provider) {
1237 return Err(ConnectorError::DuplicateProvider(provider.0));
1238 }
1239 self.connectors
1240 .insert(provider, Arc::new(AsyncMutex::new(connector)));
1241 Ok(())
1242 }
1243
1244 pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
1245 self.connectors.get(id).cloned()
1246 }
1247
1248 pub fn remove(&mut self, id: &ProviderId) -> Option<ConnectorHandle> {
1249 self.connectors.remove(id)
1250 }
1251
1252 pub fn list(&self) -> Vec<ProviderId> {
1253 self.connectors.keys().cloned().collect()
1254 }
1255
1256 pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1257 for connector in self.connectors.values() {
1258 connector.lock().await.init(ctx.clone()).await?;
1259 }
1260 Ok(())
1261 }
1262
1263 pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
1264 let mut clients = BTreeMap::new();
1265 for (provider, connector) in &self.connectors {
1266 let client = connector.lock().await.client();
1267 clients.insert(provider.clone(), client);
1268 }
1269 clients
1270 }
1271
1272 pub async fn activate_all(
1273 &self,
1274 registry: &TriggerRegistry,
1275 ) -> Result<Vec<ActivationHandle>, ConnectorError> {
1276 let mut handles = Vec::new();
1277 for (provider, connector) in &self.connectors {
1278 let bindings = registry.bindings_for(provider);
1279 if bindings.is_empty() {
1280 continue;
1281 }
1282 let connector = connector.lock().await;
1283 handles.push(connector.activate(bindings).await?);
1284 }
1285 Ok(handles)
1286 }
1287}
1288
1289impl Default for ConnectorRegistry {
1290 fn default() -> Self {
1291 Self::with_defaults()
1292 }
1293}
1294
1295fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
1296 if provider.provider == "github" {
1306 return Box::new(GitHubConnector::new());
1307 }
1308 if provider.provider == "linear" {
1309 return Box::new(LinearConnector::new());
1310 }
1311 if provider.provider == "slack" {
1312 return Box::new(SlackConnector::new());
1313 }
1314 if provider.provider == "notion" {
1315 return Box::new(NotionConnector::new());
1316 }
1317 if provider.provider == "a2a-push" {
1318 return Box::new(A2aPushConnector::new());
1319 }
1320 match &provider.runtime {
1321 ProviderRuntimeMetadata::Builtin {
1322 connector,
1323 default_signature_variant,
1324 } => match connector.as_str() {
1325 "cron" => Box::new(CronConnector::new()),
1326 "webhook" => {
1327 let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
1328 .expect("catalog webhook signature variant must be valid");
1329 Box::new(GenericWebhookConnector::with_profile(
1330 WebhookProviderProfile::new(
1331 ProviderId::from(provider.provider.clone()),
1332 provider.schema_name.clone(),
1333 variant,
1334 ),
1335 ))
1336 }
1337 _ => Box::new(PlaceholderConnector::from_metadata(provider)),
1338 },
1339 ProviderRuntimeMetadata::Placeholder => {
1340 Box::new(PlaceholderConnector::from_metadata(provider))
1341 }
1342 }
1343}
1344
1345struct PlaceholderConnector {
1346 provider_id: ProviderId,
1347 kinds: Vec<TriggerKind>,
1348 schema_name: String,
1349}
1350
1351impl PlaceholderConnector {
1352 fn from_metadata(metadata: &ProviderMetadata) -> Self {
1353 Self {
1354 provider_id: ProviderId::from(metadata.provider.clone()),
1355 kinds: metadata
1356 .kinds
1357 .iter()
1358 .cloned()
1359 .map(TriggerKind::from)
1360 .collect(),
1361 schema_name: metadata.schema_name.clone(),
1362 }
1363 }
1364}
1365
1366struct PlaceholderClient;
1367
1368#[async_trait]
1369impl ConnectorClient for PlaceholderClient {
1370 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1371 Err(ClientError::Other(format!(
1372 "connector client method '{method}' is not implemented for this provider"
1373 )))
1374 }
1375}
1376
1377#[async_trait]
1378impl Connector for PlaceholderConnector {
1379 fn provider_id(&self) -> &ProviderId {
1380 &self.provider_id
1381 }
1382
1383 fn kinds(&self) -> &[TriggerKind] {
1384 &self.kinds
1385 }
1386
1387 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1388 Ok(())
1389 }
1390
1391 async fn activate(
1392 &self,
1393 bindings: &[TriggerBinding],
1394 ) -> Result<ActivationHandle, ConnectorError> {
1395 Ok(ActivationHandle::new(
1396 self.provider_id.clone(),
1397 bindings.len(),
1398 ))
1399 }
1400
1401 async fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
1402 Err(ConnectorError::Unsupported(format!(
1403 "provider '{}' is cataloged but does not have a concrete inbound connector yet",
1404 self.provider_id.as_str()
1405 )))
1406 }
1407
1408 fn payload_schema(&self) -> ProviderPayloadSchema {
1409 ProviderPayloadSchema::named(self.schema_name.clone())
1410 }
1411
1412 fn client(&self) -> Arc<dyn ConnectorClient> {
1413 Arc::new(PlaceholderClient)
1414 }
1415}
1416
1417pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
1418 ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
1419 *slot.borrow_mut() = clients
1420 .into_iter()
1421 .map(|(provider, client)| (provider.as_str().to_string(), client))
1422 .collect();
1423 });
1424}
1425
1426pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
1427 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
1428}
1429
1430pub fn clear_active_connector_clients() {
1431 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
1432}
1433
1434#[cfg(test)]
1435mod tests {
1436 use super::*;
1437
1438 use std::sync::atomic::{AtomicUsize, Ordering};
1439
1440 use async_trait::async_trait;
1441 use serde_json::json;
1442
1443 struct NoopClient;
1444
1445 #[async_trait]
1446 impl ConnectorClient for NoopClient {
1447 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1448 Ok(json!({ "method": method }))
1449 }
1450 }
1451
1452 struct FakeConnector {
1453 provider_id: ProviderId,
1454 kinds: Vec<TriggerKind>,
1455 activate_calls: Arc<AtomicUsize>,
1456 }
1457
1458 impl FakeConnector {
1459 fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
1460 Self {
1461 provider_id: ProviderId::from(provider_id),
1462 kinds: vec![TriggerKind::from("webhook")],
1463 activate_calls,
1464 }
1465 }
1466 }
1467
1468 #[async_trait]
1469 impl Connector for FakeConnector {
1470 fn provider_id(&self) -> &ProviderId {
1471 &self.provider_id
1472 }
1473
1474 fn kinds(&self) -> &[TriggerKind] {
1475 &self.kinds
1476 }
1477
1478 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1479 Ok(())
1480 }
1481
1482 async fn activate(
1483 &self,
1484 bindings: &[TriggerBinding],
1485 ) -> Result<ActivationHandle, ConnectorError> {
1486 self.activate_calls.fetch_add(1, Ordering::SeqCst);
1487 Ok(ActivationHandle::new(
1488 self.provider_id.clone(),
1489 bindings.len(),
1490 ))
1491 }
1492
1493 async fn normalize_inbound(
1494 &self,
1495 _raw: RawInbound,
1496 ) -> Result<TriggerEvent, ConnectorError> {
1497 Err(ConnectorError::Unsupported(
1498 "not needed for registry tests".to_string(),
1499 ))
1500 }
1501
1502 fn payload_schema(&self) -> ProviderPayloadSchema {
1503 ProviderPayloadSchema::named("FakePayload")
1504 }
1505
1506 fn client(&self) -> Arc<dyn ConnectorClient> {
1507 Arc::new(NoopClient)
1508 }
1509 }
1510
1511 #[tokio::test]
1512 async fn connector_registry_rejects_duplicate_providers() {
1513 let activate_calls = Arc::new(AtomicUsize::new(0));
1514 let mut registry = ConnectorRegistry::empty();
1515 registry
1516 .register(Box::new(FakeConnector::new(
1517 "github",
1518 activate_calls.clone(),
1519 )))
1520 .unwrap();
1521
1522 let error = registry
1523 .register(Box::new(FakeConnector::new("github", activate_calls)))
1524 .unwrap_err();
1525 assert!(matches!(
1526 error,
1527 ConnectorError::DuplicateProvider(provider) if provider == "github"
1528 ));
1529 }
1530
1531 #[tokio::test]
1532 async fn connector_registry_activates_only_bound_connectors() {
1533 let github_calls = Arc::new(AtomicUsize::new(0));
1534 let slack_calls = Arc::new(AtomicUsize::new(0));
1535 let mut registry = ConnectorRegistry::empty();
1536 registry
1537 .register(Box::new(FakeConnector::new("github", github_calls.clone())))
1538 .unwrap();
1539 registry
1540 .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
1541 .unwrap();
1542
1543 let mut trigger_registry = TriggerRegistry::default();
1544 trigger_registry.register(TriggerBinding::new(
1545 ProviderId::from("github"),
1546 "webhook",
1547 "github.push",
1548 ));
1549 trigger_registry.register(TriggerBinding::new(
1550 ProviderId::from("github"),
1551 "webhook",
1552 "github.installation",
1553 ));
1554
1555 let handles = registry.activate_all(&trigger_registry).await.unwrap();
1556 assert_eq!(handles.len(), 1);
1557 assert_eq!(handles[0].provider.as_str(), "github");
1558 assert_eq!(handles[0].binding_count, 2);
1559 assert_eq!(github_calls.load(Ordering::SeqCst), 1);
1560 assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
1561 }
1562
1563 #[test]
1564 fn rate_limiter_scopes_tokens_by_provider_and_key() {
1565 let factory = RateLimiterFactory::new(RateLimitConfig {
1566 capacity: 1,
1567 refill_tokens: 1,
1568 refill_interval: StdDuration::from_secs(60),
1569 });
1570
1571 assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
1572 assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
1573 assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
1574 assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
1575 }
1576
1577 #[test]
1578 fn raw_inbound_json_body_preserves_raw_bytes() {
1579 let raw = RawInbound::new(
1580 "push",
1581 BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
1582 br#"{"ok":true}"#.to_vec(),
1583 );
1584
1585 assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
1586 }
1587
1588 #[test]
1589 fn connector_registry_lists_catalog_providers() {
1590 let registry = ConnectorRegistry::default();
1591 let providers = registry.list();
1592 assert!(providers.contains(&ProviderId::from("cron")));
1593 assert!(providers.contains(&ProviderId::from("github")));
1594 assert!(providers.contains(&ProviderId::from("webhook")));
1595 }
1596
1597 #[test]
1598 fn metrics_registry_exports_orchestrator_metric_families() {
1599 let metrics = MetricsRegistry::default();
1600 metrics.record_http_request(
1601 "/triggers/github",
1602 "POST",
1603 200,
1604 StdDuration::from_millis(25),
1605 512,
1606 );
1607 metrics.record_trigger_received("github-new-issue", "github");
1608 metrics.record_trigger_deduped("github-new-issue", "inbox_duplicate");
1609 metrics.record_trigger_predicate_evaluation("github-new-issue", true, 0.002);
1610 metrics.record_trigger_dispatched("github-new-issue", "local", "succeeded");
1611 metrics.record_trigger_retry("github-new-issue", 2);
1612 metrics.record_trigger_dlq("github-new-issue", "retry_exhausted");
1613 metrics.set_trigger_inflight("github-new-issue", 0);
1614 metrics.record_event_log_append(
1615 "orchestrator.triggers.pending",
1616 StdDuration::from_millis(1),
1617 2048,
1618 );
1619 metrics.set_event_log_consumer_lag("orchestrator.triggers.pending", "orchestrator-pump", 0);
1620 metrics.set_trigger_budget_cost_today("github-new-issue", 0.002);
1621 metrics.record_trigger_budget_exhausted("github-new-issue", "daily_budget_exceeded");
1622 metrics.record_a2a_hop("agent.example", "succeeded", StdDuration::from_millis(10));
1623 metrics.set_worker_queue_depth("triage", 1);
1624 metrics.record_worker_queue_claim_age("triage", 3.0);
1625 metrics.record_llm_call("mock", "mock", "succeeded", 0.01);
1626 metrics.record_llm_cache_hit("mock");
1627
1628 let rendered = metrics.render_prometheus();
1629 for needle in [
1630 "harn_http_requests_total{endpoint=\"/triggers/github\",method=\"POST\",status=\"200\"} 1",
1631 "harn_http_request_duration_seconds_bucket{endpoint=\"/triggers/github\",le=\"0.05\"} 1",
1632 "harn_http_body_size_bytes_bucket{endpoint=\"/triggers/github\",le=\"512\"} 1",
1633 "harn_trigger_received_total{provider=\"github\",trigger_id=\"github-new-issue\"} 1",
1634 "harn_trigger_deduped_total{reason=\"inbox_duplicate\",trigger_id=\"github-new-issue\"} 1",
1635 "harn_trigger_predicate_evaluations_total{result=\"true\",trigger_id=\"github-new-issue\"} 1",
1636 "harn_trigger_predicate_cost_usd_bucket{le=\"0.01\",trigger_id=\"github-new-issue\"} 1",
1637 "harn_trigger_dispatched_total{handler_kind=\"local\",outcome=\"succeeded\",trigger_id=\"github-new-issue\"} 1",
1638 "harn_trigger_retries_total{attempt=\"2\",trigger_id=\"github-new-issue\"} 1",
1639 "harn_trigger_dlq_total{reason=\"retry_exhausted\",trigger_id=\"github-new-issue\"} 1",
1640 "harn_trigger_inflight{trigger_id=\"github-new-issue\"} 0",
1641 "harn_event_log_append_duration_seconds_bucket{le=\"0.005\",topic=\"orchestrator.triggers.pending\"} 1",
1642 "harn_event_log_topic_size_bytes{topic=\"orchestrator.triggers.pending\"} 2048",
1643 "harn_event_log_consumer_lag{consumer=\"orchestrator-pump\",topic=\"orchestrator.triggers.pending\"} 0",
1644 "harn_trigger_budget_cost_today_usd{trigger_id=\"github-new-issue\"} 0.002",
1645 "harn_trigger_budget_exhausted_total{strategy=\"daily_budget_exceeded\",trigger_id=\"github-new-issue\"} 1",
1646 "harn_a2a_hops_total{outcome=\"succeeded\",target=\"agent.example\"} 1",
1647 "harn_a2a_hop_duration_seconds_bucket{le=\"0.01\",target=\"agent.example\"} 1",
1648 "harn_worker_queue_depth{queue=\"triage\"} 1",
1649 "harn_worker_queue_claim_age_seconds_bucket{le=\"5\",queue=\"triage\"} 1",
1650 "harn_llm_calls_total{model=\"mock\",outcome=\"succeeded\",provider=\"mock\"} 1",
1651 "harn_llm_cost_usd_total{model=\"mock\",provider=\"mock\"} 0.01",
1652 "harn_llm_cache_hits_total{provider=\"mock\"} 1",
1653 ] {
1654 assert!(rendered.contains(needle), "missing {needle}\n{rendered}");
1655 }
1656 }
1657}