1use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex, OnceLock};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25 registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26 ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod a2a_push;
30pub mod cron;
31pub mod github;
32pub mod harn_module;
33pub mod hmac;
34pub mod linear;
35pub mod notion;
36pub mod slack;
37pub mod stream;
38#[cfg(test)]
39pub(crate) mod test_util;
40pub mod webhook;
41
42pub use a2a_push::A2aPushConnector;
43pub use cron::{CatchupMode, CronConnector};
44pub use github::GitHubConnector;
45pub use harn_module::{
46 load_contract as load_harn_connector_contract, HarnConnector, HarnConnectorContract,
47};
48pub use hmac::{
49 verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
50 DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
51 DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
52 DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
53 DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
54 DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
55 SIGNATURE_VERIFY_AUDIT_TOPIC,
56};
57pub use linear::LinearConnector;
58pub use notion::{
59 load_pending_webhook_handshakes, NotionConnector, PersistedNotionWebhookHandshake,
60};
61pub use slack::SlackConnector;
62pub use stream::StreamConnector;
63use webhook::WebhookProviderProfile;
64pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
65
66pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
68
69thread_local! {
70 static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
71 RefCell::new(BTreeMap::new());
72}
73
74#[async_trait]
76pub trait Connector: Send + Sync {
77 fn provider_id(&self) -> &ProviderId;
79
80 fn kinds(&self) -> &[TriggerKind];
82
83 async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
85
86 async fn activate(
88 &self,
89 bindings: &[TriggerBinding],
90 ) -> Result<ActivationHandle, ConnectorError>;
91
92 async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
94 Ok(())
95 }
96
97 async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
99
100 async fn normalize_inbound_result(
103 &self,
104 raw: RawInbound,
105 ) -> Result<ConnectorNormalizeResult, ConnectorError> {
106 self.normalize_inbound(raw)
107 .await
108 .map(ConnectorNormalizeResult::event)
109 }
110
111 fn payload_schema(&self) -> ProviderPayloadSchema;
113
114 fn client(&self) -> Arc<dyn ConnectorClient>;
116}
117
118#[derive(Clone, Debug, PartialEq)]
120pub struct ConnectorHttpResponse {
121 pub status: u16,
122 pub headers: BTreeMap<String, String>,
123 pub body: JsonValue,
124}
125
126impl ConnectorHttpResponse {
127 pub fn new(status: u16, headers: BTreeMap<String, String>, body: JsonValue) -> Self {
128 Self {
129 status,
130 headers,
131 body,
132 }
133 }
134}
135
136#[derive(Clone, Debug, PartialEq)]
138pub enum ConnectorNormalizeResult {
139 Event(Box<TriggerEvent>),
140 Batch(Vec<TriggerEvent>),
141 ImmediateResponse {
142 response: ConnectorHttpResponse,
143 events: Vec<TriggerEvent>,
144 },
145 Reject(ConnectorHttpResponse),
146}
147
148impl ConnectorNormalizeResult {
149 pub fn event(event: TriggerEvent) -> Self {
150 Self::Event(Box::new(event))
151 }
152
153 pub fn into_events(self) -> Vec<TriggerEvent> {
154 match self {
155 Self::Event(event) => vec![*event],
156 Self::Batch(events) | Self::ImmediateResponse { events, .. } => events,
157 Self::Reject(_) => Vec::new(),
158 }
159 }
160}
161
162#[derive(Clone, Debug, PartialEq)]
163pub enum PostNormalizeOutcome {
164 Ready(Box<TriggerEvent>),
165 DuplicateDropped,
166}
167
168pub async fn postprocess_normalized_event(
169 inbox: &InboxIndex,
170 binding_id: &str,
171 dedupe_enabled: bool,
172 dedupe_ttl: StdDuration,
173 mut event: TriggerEvent,
174) -> Result<PostNormalizeOutcome, ConnectorError> {
175 if dedupe_enabled && !event.dedupe_claimed() {
176 if !inbox
177 .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
178 .await?
179 {
180 return Ok(PostNormalizeOutcome::DuplicateDropped);
181 }
182 event.mark_dedupe_claimed();
183 }
184
185 Ok(PostNormalizeOutcome::Ready(Box::new(event)))
186}
187
188#[async_trait]
190pub trait ConnectorClient: Send + Sync {
191 async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
192}
193
194#[derive(Clone, Debug, PartialEq, Eq)]
196pub enum ClientError {
197 MethodNotFound(String),
198 InvalidArgs(String),
199 RateLimited(String),
200 Transport(String),
201 Other(String),
202}
203
204impl fmt::Display for ClientError {
205 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
206 match self {
207 Self::MethodNotFound(message)
208 | Self::InvalidArgs(message)
209 | Self::RateLimited(message)
210 | Self::Transport(message)
211 | Self::Other(message) => message.fmt(f),
212 }
213 }
214}
215
216impl std::error::Error for ClientError {}
217
218#[derive(Debug)]
220pub enum ConnectorError {
221 DuplicateProvider(String),
222 DuplicateDelivery(String),
223 UnknownProvider(String),
224 MissingHeader(String),
225 InvalidHeader {
226 name: String,
227 detail: String,
228 },
229 InvalidSignature(String),
230 TimestampOutOfWindow {
231 timestamp: OffsetDateTime,
232 now: OffsetDateTime,
233 window: time::Duration,
234 },
235 Json(String),
236 Secret(String),
237 EventLog(String),
238 HarnRuntime(String),
239 Client(ClientError),
240 Unsupported(String),
241 Activation(String),
242}
243
244impl ConnectorError {
245 pub fn invalid_signature(message: impl Into<String>) -> Self {
246 Self::InvalidSignature(message.into())
247 }
248}
249
250impl fmt::Display for ConnectorError {
251 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252 match self {
253 Self::DuplicateProvider(provider) => {
254 write!(f, "connector provider `{provider}` is already registered")
255 }
256 Self::DuplicateDelivery(message) => message.fmt(f),
257 Self::UnknownProvider(provider) => {
258 write!(f, "connector provider `{provider}` is not registered")
259 }
260 Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
261 Self::InvalidHeader { name, detail } => {
262 write!(f, "invalid header `{name}`: {detail}")
263 }
264 Self::InvalidSignature(message)
265 | Self::Json(message)
266 | Self::Secret(message)
267 | Self::EventLog(message)
268 | Self::HarnRuntime(message)
269 | Self::Unsupported(message)
270 | Self::Activation(message) => message.fmt(f),
271 Self::TimestampOutOfWindow {
272 timestamp,
273 now,
274 window,
275 } => write!(
276 f,
277 "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
278 ),
279 Self::Client(error) => error.fmt(f),
280 }
281 }
282}
283
284impl std::error::Error for ConnectorError {}
285
286impl From<crate::event_log::LogError> for ConnectorError {
287 fn from(value: crate::event_log::LogError) -> Self {
288 Self::EventLog(value.to_string())
289 }
290}
291
292impl From<crate::secrets::SecretError> for ConnectorError {
293 fn from(value: crate::secrets::SecretError) -> Self {
294 Self::Secret(value.to_string())
295 }
296}
297
298impl From<serde_json::Error> for ConnectorError {
299 fn from(value: serde_json::Error) -> Self {
300 Self::Json(value.to_string())
301 }
302}
303
304impl From<ClientError> for ConnectorError {
305 fn from(value: ClientError) -> Self {
306 Self::Client(value)
307 }
308}
309
310#[derive(Clone)]
312pub struct ConnectorCtx {
313 pub event_log: Arc<AnyEventLog>,
314 pub secrets: Arc<dyn SecretProvider>,
315 pub inbox: Arc<InboxIndex>,
316 pub metrics: Arc<MetricsRegistry>,
317 pub rate_limiter: Arc<RateLimiterFactory>,
318}
319
320#[derive(Clone, Debug, Default, PartialEq, Eq)]
322pub struct ConnectorMetricsSnapshot {
323 pub inbox_claims_written: u64,
324 pub inbox_duplicates_rejected: u64,
325 pub inbox_fast_path_hits: u64,
326 pub inbox_durable_hits: u64,
327 pub inbox_expired_entries: u64,
328 pub inbox_active_entries: u64,
329 pub linear_timestamp_rejections_total: u64,
330 pub dispatch_succeeded_total: u64,
331 pub dispatch_failed_total: u64,
332 pub retry_scheduled_total: u64,
333 pub slack_delivery_success_total: u64,
334 pub slack_delivery_failure_total: u64,
335}
336
337type MetricLabels = BTreeMap<String, String>;
338
339#[derive(Clone, Debug, Default, PartialEq)]
340struct HistogramMetric {
341 buckets: BTreeMap<String, u64>,
342 count: u64,
343 sum: f64,
344}
345
346static ACTIVE_METRICS_REGISTRY: OnceLock<Mutex<Option<Arc<MetricsRegistry>>>> = OnceLock::new();
347
348pub fn install_active_metrics_registry(metrics: Arc<MetricsRegistry>) {
349 let slot = ACTIVE_METRICS_REGISTRY.get_or_init(|| Mutex::new(None));
350 *slot.lock().expect("active metrics registry poisoned") = Some(metrics);
351}
352
353pub fn clear_active_metrics_registry() {
354 if let Some(slot) = ACTIVE_METRICS_REGISTRY.get() {
355 *slot.lock().expect("active metrics registry poisoned") = None;
356 }
357}
358
359pub fn active_metrics_registry() -> Option<Arc<MetricsRegistry>> {
360 ACTIVE_METRICS_REGISTRY.get().and_then(|slot| {
361 slot.lock()
362 .expect("active metrics registry poisoned")
363 .clone()
364 })
365}
366
367#[derive(Debug, Default)]
369pub struct MetricsRegistry {
370 inbox_claims_written: AtomicU64,
371 inbox_duplicates_rejected: AtomicU64,
372 inbox_fast_path_hits: AtomicU64,
373 inbox_durable_hits: AtomicU64,
374 inbox_expired_entries: AtomicU64,
375 inbox_active_entries: AtomicU64,
376 linear_timestamp_rejections_total: AtomicU64,
377 dispatch_succeeded_total: AtomicU64,
378 dispatch_failed_total: AtomicU64,
379 retry_scheduled_total: AtomicU64,
380 slack_delivery_success_total: AtomicU64,
381 slack_delivery_failure_total: AtomicU64,
382 custom_counters: Mutex<BTreeMap<String, u64>>,
383 counters: Mutex<BTreeMap<(String, MetricLabels), f64>>,
384 gauges: Mutex<BTreeMap<(String, MetricLabels), f64>>,
385 histograms: Mutex<BTreeMap<(String, MetricLabels), HistogramMetric>>,
386}
387
388impl MetricsRegistry {
389 const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0];
390 const SIZE_BUCKETS: [f64; 9] = [
391 128.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0, 10485760.0,
392 ];
393
394 pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
395 ConnectorMetricsSnapshot {
396 inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
397 inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
398 inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
399 inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
400 inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
401 inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
402 linear_timestamp_rejections_total: self
403 .linear_timestamp_rejections_total
404 .load(Ordering::Relaxed),
405 dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
406 dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
407 retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
408 slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
409 slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
410 }
411 }
412
413 pub(crate) fn record_inbox_claim(&self) {
414 self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
415 }
416
417 pub(crate) fn record_inbox_duplicate_fast_path(&self) {
418 self.inbox_duplicates_rejected
419 .fetch_add(1, Ordering::Relaxed);
420 self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
421 }
422
423 pub(crate) fn record_inbox_duplicate_durable(&self) {
424 self.inbox_duplicates_rejected
425 .fetch_add(1, Ordering::Relaxed);
426 self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
427 }
428
429 pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
430 if count > 0 {
431 self.inbox_expired_entries
432 .fetch_add(count, Ordering::Relaxed);
433 }
434 }
435
436 pub(crate) fn set_inbox_active_entries(&self, count: usize) {
437 self.inbox_active_entries
438 .store(count as u64, Ordering::Relaxed);
439 }
440
441 pub fn record_linear_timestamp_rejection(&self) {
442 self.linear_timestamp_rejections_total
443 .fetch_add(1, Ordering::Relaxed);
444 }
445
446 pub fn record_dispatch_succeeded(&self) {
447 self.dispatch_succeeded_total
448 .fetch_add(1, Ordering::Relaxed);
449 }
450
451 pub fn record_dispatch_failed(&self) {
452 self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
453 }
454
455 pub fn record_retry_scheduled(&self) {
456 self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
457 }
458
459 pub fn record_slack_delivery_success(&self) {
460 self.slack_delivery_success_total
461 .fetch_add(1, Ordering::Relaxed);
462 }
463
464 pub fn record_slack_delivery_failure(&self) {
465 self.slack_delivery_failure_total
466 .fetch_add(1, Ordering::Relaxed);
467 }
468
469 pub fn record_custom_counter(&self, name: &str, amount: u64) {
470 if amount == 0 {
471 return;
472 }
473 let mut counters = self
474 .custom_counters
475 .lock()
476 .expect("custom counters poisoned");
477 *counters.entry(name.to_string()).or_default() += amount;
478 }
479
480 pub fn record_http_request(
481 &self,
482 endpoint: &str,
483 method: &str,
484 status: u16,
485 duration: StdDuration,
486 body_size_bytes: usize,
487 ) {
488 self.increment_counter(
489 "harn_http_requests_total",
490 labels([
491 ("endpoint", endpoint),
492 ("method", method),
493 ("status", &status.to_string()),
494 ]),
495 1,
496 );
497 self.observe_histogram(
498 "harn_http_request_duration_seconds",
499 labels([("endpoint", endpoint)]),
500 duration.as_secs_f64(),
501 &Self::DURATION_BUCKETS,
502 );
503 self.observe_histogram(
504 "harn_http_body_size_bytes",
505 labels([("endpoint", endpoint)]),
506 body_size_bytes as f64,
507 &Self::SIZE_BUCKETS,
508 );
509 }
510
511 pub fn record_trigger_received(&self, trigger_id: &str, provider: &str) {
512 self.increment_counter(
513 "harn_trigger_received_total",
514 labels([("trigger_id", trigger_id), ("provider", provider)]),
515 1,
516 );
517 }
518
519 pub fn record_trigger_deduped(&self, trigger_id: &str, reason: &str) {
520 self.increment_counter(
521 "harn_trigger_deduped_total",
522 labels([("trigger_id", trigger_id), ("reason", reason)]),
523 1,
524 );
525 }
526
527 pub fn record_trigger_predicate_evaluation(
528 &self,
529 trigger_id: &str,
530 result: bool,
531 cost_usd: f64,
532 ) {
533 self.increment_counter(
534 "harn_trigger_predicate_evaluations_total",
535 labels([
536 ("trigger_id", trigger_id),
537 ("result", if result { "true" } else { "false" }),
538 ]),
539 1,
540 );
541 self.observe_histogram(
542 "harn_trigger_predicate_cost_usd",
543 labels([("trigger_id", trigger_id)]),
544 cost_usd.max(0.0),
545 &[0.0, 0.001, 0.01, 0.05, 0.1, 1.0],
546 );
547 }
548
549 pub fn record_trigger_dispatched(&self, trigger_id: &str, handler_kind: &str, outcome: &str) {
550 self.increment_counter(
551 "harn_trigger_dispatched_total",
552 labels([
553 ("trigger_id", trigger_id),
554 ("handler_kind", handler_kind),
555 ("outcome", outcome),
556 ]),
557 1,
558 );
559 }
560
561 pub fn record_trigger_retry(&self, trigger_id: &str, attempt: u32) {
562 self.increment_counter(
563 "harn_trigger_retries_total",
564 labels([
565 ("trigger_id", trigger_id),
566 ("attempt", &attempt.to_string()),
567 ]),
568 1,
569 );
570 }
571
572 pub fn record_trigger_dlq(&self, trigger_id: &str, reason: &str) {
573 self.increment_counter(
574 "harn_trigger_dlq_total",
575 labels([("trigger_id", trigger_id), ("reason", reason)]),
576 1,
577 );
578 }
579
580 pub fn set_trigger_inflight(&self, trigger_id: &str, count: u64) {
581 self.set_gauge(
582 "harn_trigger_inflight",
583 labels([("trigger_id", trigger_id)]),
584 count as f64,
585 );
586 }
587
588 pub fn set_trigger_budget_cost_today(&self, trigger_id: &str, cost_usd: f64) {
589 self.set_gauge(
590 "harn_trigger_budget_cost_today_usd",
591 labels([("trigger_id", trigger_id)]),
592 cost_usd.max(0.0),
593 );
594 }
595
596 pub fn record_trigger_budget_exhausted(&self, trigger_id: &str, strategy: &str) {
597 self.increment_counter(
598 "harn_trigger_budget_exhausted_total",
599 labels([("trigger_id", trigger_id), ("strategy", strategy)]),
600 1,
601 );
602 }
603
604 pub fn record_event_log_append(
605 &self,
606 topic: &str,
607 duration: StdDuration,
608 payload_bytes: usize,
609 ) {
610 self.observe_histogram(
611 "harn_event_log_append_duration_seconds",
612 labels([("topic", topic)]),
613 duration.as_secs_f64(),
614 &Self::DURATION_BUCKETS,
615 );
616 self.set_gauge(
617 "harn_event_log_topic_size_bytes",
618 labels([("topic", topic)]),
619 payload_bytes as f64,
620 );
621 }
622
623 pub fn set_event_log_consumer_lag(&self, topic: &str, consumer: &str, lag: u64) {
624 self.set_gauge(
625 "harn_event_log_consumer_lag",
626 labels([("topic", topic), ("consumer", consumer)]),
627 lag as f64,
628 );
629 }
630
631 pub fn record_a2a_hop(&self, target: &str, outcome: &str, duration: StdDuration) {
632 self.increment_counter(
633 "harn_a2a_hops_total",
634 labels([("target", target), ("outcome", outcome)]),
635 1,
636 );
637 self.observe_histogram(
638 "harn_a2a_hop_duration_seconds",
639 labels([("target", target)]),
640 duration.as_secs_f64(),
641 &Self::DURATION_BUCKETS,
642 );
643 }
644
645 pub fn set_worker_queue_depth(&self, queue: &str, depth: u64) {
646 self.set_gauge(
647 "harn_worker_queue_depth",
648 labels([("queue", queue)]),
649 depth as f64,
650 );
651 }
652
653 pub fn record_worker_queue_claim_age(&self, queue: &str, age_seconds: f64) {
654 self.observe_histogram(
655 "harn_worker_queue_claim_age_seconds",
656 labels([("queue", queue)]),
657 age_seconds.max(0.0),
658 &Self::DURATION_BUCKETS,
659 );
660 }
661
662 pub fn set_orchestrator_pump_backlog(&self, topic: &str, count: u64) {
663 self.set_gauge(
664 "harn_orchestrator_pump_backlog",
665 labels([("topic", topic)]),
666 count as f64,
667 );
668 }
669
670 pub fn set_orchestrator_pump_outstanding(&self, topic: &str, count: usize) {
671 self.set_gauge(
672 "harn_orchestrator_pump_outstanding",
673 labels([("topic", topic)]),
674 count as f64,
675 );
676 }
677
678 pub fn record_orchestrator_pump_admission_delay(&self, topic: &str, duration: StdDuration) {
679 self.observe_histogram(
680 "harn_orchestrator_pump_admission_delay_seconds",
681 labels([("topic", topic)]),
682 duration.as_secs_f64(),
683 &Self::DURATION_BUCKETS,
684 );
685 }
686
687 pub fn record_llm_call(&self, provider: &str, model: &str, outcome: &str, cost_usd: f64) {
688 self.increment_counter(
689 "harn_llm_calls_total",
690 labels([
691 ("provider", provider),
692 ("model", model),
693 ("outcome", outcome),
694 ]),
695 1,
696 );
697 if cost_usd > 0.0 {
698 self.increment_counter(
699 "harn_llm_cost_usd_total",
700 labels([("provider", provider), ("model", model)]),
701 cost_usd,
702 );
703 } else {
704 self.ensure_counter(
705 "harn_llm_cost_usd_total",
706 labels([("provider", provider), ("model", model)]),
707 );
708 }
709 }
710
711 pub fn record_llm_cache_hit(&self, provider: &str) {
712 self.increment_counter(
713 "harn_llm_cache_hits_total",
714 labels([("provider", provider)]),
715 1,
716 );
717 }
718
719 pub fn render_prometheus(&self) -> String {
720 let snapshot = self.snapshot();
721 let counters = [
722 (
723 "connector_linear_timestamp_rejections_total",
724 snapshot.linear_timestamp_rejections_total,
725 ),
726 (
727 "dispatch_succeeded_total",
728 snapshot.dispatch_succeeded_total,
729 ),
730 ("dispatch_failed_total", snapshot.dispatch_failed_total),
731 ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
732 ("retry_scheduled_total", snapshot.retry_scheduled_total),
733 (
734 "slack_events_delivery_success_total",
735 snapshot.slack_delivery_success_total,
736 ),
737 (
738 "slack_events_delivery_failure_total",
739 snapshot.slack_delivery_failure_total,
740 ),
741 ];
742
743 let mut rendered = String::new();
744 for (name, value) in counters {
745 rendered.push_str("# TYPE ");
746 rendered.push_str(name);
747 rendered.push_str(" counter\n");
748 rendered.push_str(name);
749 rendered.push(' ');
750 rendered.push_str(&value.to_string());
751 rendered.push('\n');
752 }
753 let custom_counters = self
754 .custom_counters
755 .lock()
756 .expect("custom counters poisoned");
757 for (name, value) in custom_counters.iter() {
758 let metric_name = format!(
759 "connector_custom_{}_total",
760 name.chars()
761 .map(|ch| if ch.is_ascii_alphanumeric() || ch == '_' {
762 ch
763 } else {
764 '_'
765 })
766 .collect::<String>()
767 );
768 rendered.push_str("# TYPE ");
769 rendered.push_str(&metric_name);
770 rendered.push_str(" counter\n");
771 rendered.push_str(&metric_name);
772 rendered.push(' ');
773 rendered.push_str(&value.to_string());
774 rendered.push('\n');
775 }
776 rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
777 rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
778 rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
779 rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
780 self.render_generic_metrics(&mut rendered);
781 rendered
782 }
783
784 fn increment_counter(&self, name: &str, labels: MetricLabels, amount: impl Into<f64>) {
785 let amount = amount.into();
786 if amount <= 0.0 || !amount.is_finite() {
787 return;
788 }
789 let mut counters = self.counters.lock().expect("metrics counters poisoned");
790 *counters.entry((name.to_string(), labels)).or_default() += amount;
791 }
792
793 fn ensure_counter(&self, name: &str, labels: MetricLabels) {
794 let mut counters = self.counters.lock().expect("metrics counters poisoned");
795 counters.entry((name.to_string(), labels)).or_default();
796 }
797
798 fn set_gauge(&self, name: &str, labels: MetricLabels, value: f64) {
799 let mut gauges = self.gauges.lock().expect("metrics gauges poisoned");
800 gauges.insert((name.to_string(), labels), value);
801 }
802
803 fn observe_histogram(
804 &self,
805 name: &str,
806 labels: MetricLabels,
807 value: f64,
808 bucket_bounds: &[f64],
809 ) {
810 if !value.is_finite() {
811 return;
812 }
813 let mut histograms = self.histograms.lock().expect("metrics histograms poisoned");
814 let histogram = histograms
815 .entry((name.to_string(), labels))
816 .or_insert_with(|| HistogramMetric {
817 buckets: bucket_bounds
818 .iter()
819 .map(|bound| (prometheus_float(*bound), 0))
820 .chain(std::iter::once(("+Inf".to_string(), 0)))
821 .collect(),
822 count: 0,
823 sum: 0.0,
824 });
825 histogram.count += 1;
826 histogram.sum += value;
827 for bound in bucket_bounds {
828 if value <= *bound {
829 let key = prometheus_float(*bound);
830 *histogram.buckets.entry(key).or_default() += 1;
831 }
832 }
833 *histogram.buckets.entry("+Inf".to_string()).or_default() += 1;
834 }
835
836 fn render_generic_metrics(&self, rendered: &mut String) {
837 let counters = self
838 .counters
839 .lock()
840 .expect("metrics counters poisoned")
841 .clone();
842 let gauges = self.gauges.lock().expect("metrics gauges poisoned").clone();
843 let histograms = self
844 .histograms
845 .lock()
846 .expect("metrics histograms poisoned")
847 .clone();
848
849 for name in metric_family_names(MetricKind::Counter) {
850 rendered.push_str("# TYPE ");
851 rendered.push_str(name);
852 rendered.push_str(" counter\n");
853 for ((sample_name, labels), value) in counters.iter().filter(|((n, _), _)| n == name) {
854 render_sample(rendered, sample_name, labels, *value);
855 }
856 }
857 for name in metric_family_names(MetricKind::Gauge) {
858 rendered.push_str("# TYPE ");
859 rendered.push_str(name);
860 rendered.push_str(" gauge\n");
861 for ((sample_name, labels), value) in gauges.iter().filter(|((n, _), _)| n == name) {
862 render_sample(rendered, sample_name, labels, *value);
863 }
864 }
865 for name in metric_family_names(MetricKind::Histogram) {
866 rendered.push_str("# TYPE ");
867 rendered.push_str(name);
868 rendered.push_str(" histogram\n");
869 for ((sample_name, labels), histogram) in
870 histograms.iter().filter(|((n, _), _)| n == name)
871 {
872 for (le, value) in &histogram.buckets {
873 let mut bucket_labels = labels.clone();
874 bucket_labels.insert("le".to_string(), le.clone());
875 render_sample(
876 rendered,
877 &format!("{sample_name}_bucket"),
878 &bucket_labels,
879 *value as f64,
880 );
881 }
882 render_sample(
883 rendered,
884 &format!("{sample_name}_sum"),
885 labels,
886 histogram.sum,
887 );
888 render_sample(
889 rendered,
890 &format!("{sample_name}_count"),
891 labels,
892 histogram.count as f64,
893 );
894 }
895 }
896 }
897}
898
899#[derive(Clone, Copy)]
900enum MetricKind {
901 Counter,
902 Gauge,
903 Histogram,
904}
905
906fn metric_family_names(kind: MetricKind) -> &'static [&'static str] {
907 match kind {
908 MetricKind::Counter => &[
909 "harn_http_requests_total",
910 "harn_trigger_received_total",
911 "harn_trigger_deduped_total",
912 "harn_trigger_predicate_evaluations_total",
913 "harn_trigger_dispatched_total",
914 "harn_trigger_retries_total",
915 "harn_trigger_dlq_total",
916 "harn_trigger_budget_exhausted_total",
917 "harn_a2a_hops_total",
918 "harn_llm_calls_total",
919 "harn_llm_cost_usd_total",
920 "harn_llm_cache_hits_total",
921 ],
922 MetricKind::Gauge => &[
923 "harn_trigger_inflight",
924 "harn_event_log_topic_size_bytes",
925 "harn_event_log_consumer_lag",
926 "harn_trigger_budget_cost_today_usd",
927 "harn_worker_queue_depth",
928 "harn_orchestrator_pump_backlog",
929 "harn_orchestrator_pump_outstanding",
930 ],
931 MetricKind::Histogram => &[
932 "harn_http_request_duration_seconds",
933 "harn_http_body_size_bytes",
934 "harn_trigger_predicate_cost_usd",
935 "harn_event_log_append_duration_seconds",
936 "harn_a2a_hop_duration_seconds",
937 "harn_worker_queue_claim_age_seconds",
938 "harn_orchestrator_pump_admission_delay_seconds",
939 ],
940 }
941}
942
943fn labels<const N: usize>(pairs: [(&str, &str); N]) -> MetricLabels {
944 pairs
945 .into_iter()
946 .map(|(name, value)| (name.to_string(), value.to_string()))
947 .collect()
948}
949
950fn render_sample(rendered: &mut String, name: &str, labels: &MetricLabels, value: f64) {
951 rendered.push_str(name);
952 if !labels.is_empty() {
953 rendered.push('{');
954 for (index, (label, label_value)) in labels.iter().enumerate() {
955 if index > 0 {
956 rendered.push(',');
957 }
958 rendered.push_str(label);
959 rendered.push_str("=\"");
960 rendered.push_str(&escape_label_value(label_value));
961 rendered.push('"');
962 }
963 rendered.push('}');
964 }
965 rendered.push(' ');
966 rendered.push_str(&prometheus_float(value));
967 rendered.push('\n');
968}
969
970fn escape_label_value(value: &str) -> String {
971 value
972 .chars()
973 .flat_map(|ch| match ch {
974 '\\' => "\\\\".chars().collect::<Vec<_>>(),
975 '"' => "\\\"".chars().collect::<Vec<_>>(),
976 '\n' => "\\n".chars().collect::<Vec<_>>(),
977 other => vec![other],
978 })
979 .collect()
980}
981
982fn prometheus_float(value: f64) -> String {
983 if value.is_infinite() && value.is_sign_positive() {
984 return "+Inf".to_string();
985 }
986 if value.fract() == 0.0 {
987 format!("{value:.0}")
988 } else {
989 let rendered = format!("{value:.6}");
990 rendered
991 .trim_end_matches('0')
992 .trim_end_matches('.')
993 .to_string()
994 }
995}
996
997#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
999pub struct ProviderPayloadSchema {
1000 pub harn_schema_name: String,
1001 #[serde(default)]
1002 pub json_schema: JsonValue,
1003}
1004
1005impl ProviderPayloadSchema {
1006 pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
1007 Self {
1008 harn_schema_name: harn_schema_name.into(),
1009 json_schema,
1010 }
1011 }
1012
1013 pub fn named(harn_schema_name: impl Into<String>) -> Self {
1014 Self::new(harn_schema_name, JsonValue::Null)
1015 }
1016}
1017
1018impl Default for ProviderPayloadSchema {
1019 fn default() -> Self {
1020 Self::named("raw")
1021 }
1022}
1023
1024#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1026#[serde(transparent)]
1027pub struct TriggerKind(String);
1028
1029impl TriggerKind {
1030 pub fn new(value: impl Into<String>) -> Self {
1031 Self(value.into())
1032 }
1033
1034 pub fn as_str(&self) -> &str {
1035 self.0.as_str()
1036 }
1037}
1038
1039impl From<&str> for TriggerKind {
1040 fn from(value: &str) -> Self {
1041 Self::new(value)
1042 }
1043}
1044
1045impl From<String> for TriggerKind {
1046 fn from(value: String) -> Self {
1047 Self::new(value)
1048 }
1049}
1050
1051#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1053pub struct TriggerBinding {
1054 pub provider: ProviderId,
1055 pub kind: TriggerKind,
1056 pub binding_id: String,
1057 #[serde(default)]
1058 pub dedupe_key: Option<String>,
1059 #[serde(default = "default_dedupe_retention_days")]
1060 pub dedupe_retention_days: u32,
1061 #[serde(default)]
1062 pub config: JsonValue,
1063}
1064
1065impl TriggerBinding {
1066 pub fn new(
1067 provider: ProviderId,
1068 kind: impl Into<TriggerKind>,
1069 binding_id: impl Into<String>,
1070 ) -> Self {
1071 Self {
1072 provider,
1073 kind: kind.into(),
1074 binding_id: binding_id.into(),
1075 dedupe_key: None,
1076 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1077 config: JsonValue::Null,
1078 }
1079 }
1080}
1081
1082fn default_dedupe_retention_days() -> u32 {
1083 crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
1084}
1085
1086#[derive(Clone, Debug, Default)]
1088pub struct TriggerRegistry {
1089 bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
1090}
1091
1092impl TriggerRegistry {
1093 pub fn register(&mut self, binding: TriggerBinding) {
1094 self.bindings
1095 .entry(binding.provider.clone())
1096 .or_default()
1097 .push(binding);
1098 }
1099
1100 pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
1101 &self.bindings
1102 }
1103
1104 pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
1105 self.bindings
1106 .get(provider)
1107 .map(Vec::as_slice)
1108 .unwrap_or(&[])
1109 }
1110}
1111
1112#[derive(Clone, Debug, PartialEq, Eq)]
1114pub struct ActivationHandle {
1115 pub provider: ProviderId,
1116 pub binding_count: usize,
1117}
1118
1119impl ActivationHandle {
1120 pub fn new(provider: ProviderId, binding_count: usize) -> Self {
1121 Self {
1122 provider,
1123 binding_count,
1124 }
1125 }
1126}
1127
1128#[derive(Clone, Debug, PartialEq)]
1130pub struct RawInbound {
1131 pub kind: String,
1132 pub headers: BTreeMap<String, String>,
1133 pub query: BTreeMap<String, String>,
1134 pub body: Vec<u8>,
1135 pub received_at: OffsetDateTime,
1136 pub occurred_at: Option<OffsetDateTime>,
1137 pub tenant_id: Option<TenantId>,
1138 pub metadata: JsonValue,
1139}
1140
1141impl RawInbound {
1142 pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
1143 Self {
1144 kind: kind.into(),
1145 headers,
1146 query: BTreeMap::new(),
1147 body,
1148 received_at: clock::now_utc(),
1149 occurred_at: None,
1150 tenant_id: None,
1151 metadata: JsonValue::Null,
1152 }
1153 }
1154
1155 pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
1156 Ok(serde_json::from_slice(&self.body)?)
1157 }
1158}
1159
1160#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1162pub struct RateLimitConfig {
1163 pub capacity: u32,
1164 pub refill_tokens: u32,
1165 pub refill_interval: StdDuration,
1166}
1167
1168impl Default for RateLimitConfig {
1169 fn default() -> Self {
1170 Self {
1171 capacity: 60,
1172 refill_tokens: 1,
1173 refill_interval: StdDuration::from_secs(1),
1174 }
1175 }
1176}
1177
1178#[derive(Clone, Debug)]
1179struct TokenBucket {
1180 tokens: f64,
1181 last_refill: ClockInstant,
1182}
1183
1184impl TokenBucket {
1185 fn full(config: RateLimitConfig) -> Self {
1186 Self {
1187 tokens: config.capacity as f64,
1188 last_refill: clock::instant_now(),
1189 }
1190 }
1191
1192 fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
1193 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1194 let rate = config.refill_tokens.max(1) as f64 / interval;
1195 let elapsed = now.duration_since(self.last_refill).as_secs_f64();
1196 self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
1197 self.last_refill = now;
1198 }
1199
1200 fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
1201 self.refill(config, now);
1202 if self.tokens >= 1.0 {
1203 self.tokens -= 1.0;
1204 true
1205 } else {
1206 false
1207 }
1208 }
1209
1210 fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
1211 if self.tokens >= 1.0 {
1212 return StdDuration::ZERO;
1213 }
1214 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1215 let rate = config.refill_tokens.max(1) as f64 / interval;
1216 let missing = (1.0 - self.tokens).max(0.0);
1217 StdDuration::from_secs_f64((missing / rate).max(0.001))
1218 }
1219}
1220
1221#[derive(Debug)]
1223pub struct RateLimiterFactory {
1224 config: RateLimitConfig,
1225 buckets: Mutex<HashMap<(String, String), TokenBucket>>,
1226}
1227
1228impl RateLimiterFactory {
1229 pub fn new(config: RateLimitConfig) -> Self {
1230 Self {
1231 config,
1232 buckets: Mutex::new(HashMap::new()),
1233 }
1234 }
1235
1236 pub fn config(&self) -> RateLimitConfig {
1237 self.config
1238 }
1239
1240 pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
1241 ScopedRateLimiter {
1242 factory: self,
1243 provider: provider.clone(),
1244 key: key.into(),
1245 }
1246 }
1247
1248 pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
1249 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1250 let bucket = buckets
1251 .entry((provider.as_str().to_string(), key.to_string()))
1252 .or_insert_with(|| TokenBucket::full(self.config));
1253 bucket.try_acquire(self.config, clock::instant_now())
1254 }
1255
1256 pub async fn acquire(&self, provider: &ProviderId, key: &str) {
1257 loop {
1258 let wait = {
1259 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1260 let bucket = buckets
1261 .entry((provider.as_str().to_string(), key.to_string()))
1262 .or_insert_with(|| TokenBucket::full(self.config));
1263 if bucket.try_acquire(self.config, clock::instant_now()) {
1264 return;
1265 }
1266 bucket.wait_duration(self.config)
1267 };
1268 tokio::time::sleep(wait).await;
1269 }
1270 }
1271}
1272
1273impl Default for RateLimiterFactory {
1274 fn default() -> Self {
1275 Self::new(RateLimitConfig::default())
1276 }
1277}
1278
1279#[derive(Clone, Debug)]
1281pub struct ScopedRateLimiter<'a> {
1282 factory: &'a RateLimiterFactory,
1283 provider: ProviderId,
1284 key: String,
1285}
1286
1287impl<'a> ScopedRateLimiter<'a> {
1288 pub fn try_acquire(&self) -> bool {
1289 self.factory.try_acquire(&self.provider, &self.key)
1290 }
1291
1292 pub async fn acquire(&self) {
1293 self.factory.acquire(&self.provider, &self.key).await;
1294 }
1295}
1296
1297pub struct ConnectorRegistry {
1299 connectors: BTreeMap<ProviderId, ConnectorHandle>,
1300}
1301
1302impl ConnectorRegistry {
1303 pub fn empty() -> Self {
1304 Self {
1305 connectors: BTreeMap::new(),
1306 }
1307 }
1308
1309 pub fn with_defaults() -> Self {
1310 let mut registry = Self::empty();
1311 for provider in registered_provider_metadata() {
1312 registry
1313 .register(default_connector_for_provider(&provider))
1314 .expect("default connector registration should not fail");
1315 }
1316 registry
1317 }
1318
1319 pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
1320 let provider = connector.provider_id().clone();
1321 if self.connectors.contains_key(&provider) {
1322 return Err(ConnectorError::DuplicateProvider(provider.0));
1323 }
1324 self.connectors
1325 .insert(provider, Arc::new(AsyncMutex::new(connector)));
1326 Ok(())
1327 }
1328
1329 pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
1330 self.connectors.get(id).cloned()
1331 }
1332
1333 pub fn remove(&mut self, id: &ProviderId) -> Option<ConnectorHandle> {
1334 self.connectors.remove(id)
1335 }
1336
1337 pub fn list(&self) -> Vec<ProviderId> {
1338 self.connectors.keys().cloned().collect()
1339 }
1340
1341 pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1342 for connector in self.connectors.values() {
1343 connector.lock().await.init(ctx.clone()).await?;
1344 }
1345 Ok(())
1346 }
1347
1348 pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
1349 let mut clients = BTreeMap::new();
1350 for (provider, connector) in &self.connectors {
1351 let client = connector.lock().await.client();
1352 clients.insert(provider.clone(), client);
1353 }
1354 clients
1355 }
1356
1357 pub async fn activate_all(
1358 &self,
1359 registry: &TriggerRegistry,
1360 ) -> Result<Vec<ActivationHandle>, ConnectorError> {
1361 let mut handles = Vec::new();
1362 for (provider, connector) in &self.connectors {
1363 let bindings = registry.bindings_for(provider);
1364 if bindings.is_empty() {
1365 continue;
1366 }
1367 let connector = connector.lock().await;
1368 handles.push(connector.activate(bindings).await?);
1369 }
1370 Ok(handles)
1371 }
1372}
1373
1374impl Default for ConnectorRegistry {
1375 fn default() -> Self {
1376 Self::with_defaults()
1377 }
1378}
1379
1380fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
1381 if provider.provider == "github" {
1391 return Box::new(GitHubConnector::new());
1392 }
1393 if provider.provider == "linear" {
1394 return Box::new(LinearConnector::new());
1395 }
1396 if provider.provider == "slack" {
1397 return Box::new(SlackConnector::new());
1398 }
1399 if provider.provider == "notion" {
1400 return Box::new(NotionConnector::new());
1401 }
1402 if provider.provider == "a2a-push" {
1403 return Box::new(A2aPushConnector::new());
1404 }
1405 match &provider.runtime {
1406 ProviderRuntimeMetadata::Builtin {
1407 connector,
1408 default_signature_variant,
1409 } => match connector.as_str() {
1410 "cron" => Box::new(CronConnector::new()),
1411 "stream" => Box::new(StreamConnector::new(
1412 ProviderId::from(provider.provider.clone()),
1413 provider.schema_name.clone(),
1414 )),
1415 "webhook" => {
1416 let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
1417 .expect("catalog webhook signature variant must be valid");
1418 Box::new(GenericWebhookConnector::with_profile(
1419 WebhookProviderProfile::new(
1420 ProviderId::from(provider.provider.clone()),
1421 provider.schema_name.clone(),
1422 variant,
1423 ),
1424 ))
1425 }
1426 _ => Box::new(PlaceholderConnector::from_metadata(provider)),
1427 },
1428 ProviderRuntimeMetadata::Placeholder => {
1429 Box::new(PlaceholderConnector::from_metadata(provider))
1430 }
1431 }
1432}
1433
1434struct PlaceholderConnector {
1435 provider_id: ProviderId,
1436 kinds: Vec<TriggerKind>,
1437 schema_name: String,
1438}
1439
1440impl PlaceholderConnector {
1441 fn from_metadata(metadata: &ProviderMetadata) -> Self {
1442 Self {
1443 provider_id: ProviderId::from(metadata.provider.clone()),
1444 kinds: metadata
1445 .kinds
1446 .iter()
1447 .cloned()
1448 .map(TriggerKind::from)
1449 .collect(),
1450 schema_name: metadata.schema_name.clone(),
1451 }
1452 }
1453}
1454
1455struct PlaceholderClient;
1456
1457#[async_trait]
1458impl ConnectorClient for PlaceholderClient {
1459 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1460 Err(ClientError::Other(format!(
1461 "connector client method '{method}' is not implemented for this provider"
1462 )))
1463 }
1464}
1465
1466#[async_trait]
1467impl Connector for PlaceholderConnector {
1468 fn provider_id(&self) -> &ProviderId {
1469 &self.provider_id
1470 }
1471
1472 fn kinds(&self) -> &[TriggerKind] {
1473 &self.kinds
1474 }
1475
1476 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1477 Ok(())
1478 }
1479
1480 async fn activate(
1481 &self,
1482 bindings: &[TriggerBinding],
1483 ) -> Result<ActivationHandle, ConnectorError> {
1484 Ok(ActivationHandle::new(
1485 self.provider_id.clone(),
1486 bindings.len(),
1487 ))
1488 }
1489
1490 async fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
1491 Err(ConnectorError::Unsupported(format!(
1492 "provider '{}' is cataloged but does not have a concrete inbound connector yet",
1493 self.provider_id.as_str()
1494 )))
1495 }
1496
1497 fn payload_schema(&self) -> ProviderPayloadSchema {
1498 ProviderPayloadSchema::named(self.schema_name.clone())
1499 }
1500
1501 fn client(&self) -> Arc<dyn ConnectorClient> {
1502 Arc::new(PlaceholderClient)
1503 }
1504}
1505
1506pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
1507 ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
1508 *slot.borrow_mut() = clients
1509 .into_iter()
1510 .map(|(provider, client)| (provider.as_str().to_string(), client))
1511 .collect();
1512 });
1513}
1514
1515pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
1516 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
1517}
1518
1519pub fn clear_active_connector_clients() {
1520 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
1521}
1522
1523#[cfg(test)]
1524mod tests {
1525 use super::*;
1526
1527 use std::sync::atomic::{AtomicUsize, Ordering};
1528
1529 use async_trait::async_trait;
1530 use serde_json::json;
1531
1532 struct NoopClient;
1533
1534 #[async_trait]
1535 impl ConnectorClient for NoopClient {
1536 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1537 Ok(json!({ "method": method }))
1538 }
1539 }
1540
1541 struct FakeConnector {
1542 provider_id: ProviderId,
1543 kinds: Vec<TriggerKind>,
1544 activate_calls: Arc<AtomicUsize>,
1545 }
1546
1547 impl FakeConnector {
1548 fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
1549 Self {
1550 provider_id: ProviderId::from(provider_id),
1551 kinds: vec![TriggerKind::from("webhook")],
1552 activate_calls,
1553 }
1554 }
1555 }
1556
1557 #[async_trait]
1558 impl Connector for FakeConnector {
1559 fn provider_id(&self) -> &ProviderId {
1560 &self.provider_id
1561 }
1562
1563 fn kinds(&self) -> &[TriggerKind] {
1564 &self.kinds
1565 }
1566
1567 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1568 Ok(())
1569 }
1570
1571 async fn activate(
1572 &self,
1573 bindings: &[TriggerBinding],
1574 ) -> Result<ActivationHandle, ConnectorError> {
1575 self.activate_calls.fetch_add(1, Ordering::SeqCst);
1576 Ok(ActivationHandle::new(
1577 self.provider_id.clone(),
1578 bindings.len(),
1579 ))
1580 }
1581
1582 async fn normalize_inbound(
1583 &self,
1584 _raw: RawInbound,
1585 ) -> Result<TriggerEvent, ConnectorError> {
1586 Err(ConnectorError::Unsupported(
1587 "not needed for registry tests".to_string(),
1588 ))
1589 }
1590
1591 fn payload_schema(&self) -> ProviderPayloadSchema {
1592 ProviderPayloadSchema::named("FakePayload")
1593 }
1594
1595 fn client(&self) -> Arc<dyn ConnectorClient> {
1596 Arc::new(NoopClient)
1597 }
1598 }
1599
1600 #[tokio::test]
1601 async fn connector_registry_rejects_duplicate_providers() {
1602 let activate_calls = Arc::new(AtomicUsize::new(0));
1603 let mut registry = ConnectorRegistry::empty();
1604 registry
1605 .register(Box::new(FakeConnector::new(
1606 "github",
1607 activate_calls.clone(),
1608 )))
1609 .unwrap();
1610
1611 let error = registry
1612 .register(Box::new(FakeConnector::new("github", activate_calls)))
1613 .unwrap_err();
1614 assert!(matches!(
1615 error,
1616 ConnectorError::DuplicateProvider(provider) if provider == "github"
1617 ));
1618 }
1619
1620 #[tokio::test]
1621 async fn connector_registry_activates_only_bound_connectors() {
1622 let github_calls = Arc::new(AtomicUsize::new(0));
1623 let slack_calls = Arc::new(AtomicUsize::new(0));
1624 let mut registry = ConnectorRegistry::empty();
1625 registry
1626 .register(Box::new(FakeConnector::new("github", github_calls.clone())))
1627 .unwrap();
1628 registry
1629 .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
1630 .unwrap();
1631
1632 let mut trigger_registry = TriggerRegistry::default();
1633 trigger_registry.register(TriggerBinding::new(
1634 ProviderId::from("github"),
1635 "webhook",
1636 "github.push",
1637 ));
1638 trigger_registry.register(TriggerBinding::new(
1639 ProviderId::from("github"),
1640 "webhook",
1641 "github.installation",
1642 ));
1643
1644 let handles = registry.activate_all(&trigger_registry).await.unwrap();
1645 assert_eq!(handles.len(), 1);
1646 assert_eq!(handles[0].provider.as_str(), "github");
1647 assert_eq!(handles[0].binding_count, 2);
1648 assert_eq!(github_calls.load(Ordering::SeqCst), 1);
1649 assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
1650 }
1651
1652 #[test]
1653 fn rate_limiter_scopes_tokens_by_provider_and_key() {
1654 let factory = RateLimiterFactory::new(RateLimitConfig {
1655 capacity: 1,
1656 refill_tokens: 1,
1657 refill_interval: StdDuration::from_secs(60),
1658 });
1659
1660 assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
1661 assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
1662 assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
1663 assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
1664 }
1665
1666 #[test]
1667 fn raw_inbound_json_body_preserves_raw_bytes() {
1668 let raw = RawInbound::new(
1669 "push",
1670 BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
1671 br#"{"ok":true}"#.to_vec(),
1672 );
1673
1674 assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
1675 }
1676
1677 #[test]
1678 fn connector_registry_lists_catalog_providers() {
1679 let registry = ConnectorRegistry::default();
1680 let providers = registry.list();
1681 assert!(providers.contains(&ProviderId::from("cron")));
1682 assert!(providers.contains(&ProviderId::from("github")));
1683 assert!(providers.contains(&ProviderId::from("webhook")));
1684 }
1685
1686 #[test]
1687 fn metrics_registry_exports_orchestrator_metric_families() {
1688 let metrics = MetricsRegistry::default();
1689 metrics.record_http_request(
1690 "/triggers/github",
1691 "POST",
1692 200,
1693 StdDuration::from_millis(25),
1694 512,
1695 );
1696 metrics.record_trigger_received("github-new-issue", "github");
1697 metrics.record_trigger_deduped("github-new-issue", "inbox_duplicate");
1698 metrics.record_trigger_predicate_evaluation("github-new-issue", true, 0.002);
1699 metrics.record_trigger_dispatched("github-new-issue", "local", "succeeded");
1700 metrics.record_trigger_retry("github-new-issue", 2);
1701 metrics.record_trigger_dlq("github-new-issue", "retry_exhausted");
1702 metrics.set_trigger_inflight("github-new-issue", 0);
1703 metrics.record_event_log_append(
1704 "orchestrator.triggers.pending",
1705 StdDuration::from_millis(1),
1706 2048,
1707 );
1708 metrics.set_event_log_consumer_lag("orchestrator.triggers.pending", "orchestrator-pump", 0);
1709 metrics.set_trigger_budget_cost_today("github-new-issue", 0.002);
1710 metrics.record_trigger_budget_exhausted("github-new-issue", "daily_budget_exceeded");
1711 metrics.record_a2a_hop("agent.example", "succeeded", StdDuration::from_millis(10));
1712 metrics.set_worker_queue_depth("triage", 1);
1713 metrics.record_worker_queue_claim_age("triage", 3.0);
1714 metrics.set_orchestrator_pump_backlog("trigger.inbox.envelopes", 2);
1715 metrics.set_orchestrator_pump_outstanding("trigger.inbox.envelopes", 1);
1716 metrics.record_orchestrator_pump_admission_delay(
1717 "trigger.inbox.envelopes",
1718 StdDuration::from_millis(50),
1719 );
1720 metrics.record_llm_call("mock", "mock", "succeeded", 0.01);
1721 metrics.record_llm_cache_hit("mock");
1722
1723 let rendered = metrics.render_prometheus();
1724 for needle in [
1725 "harn_http_requests_total{endpoint=\"/triggers/github\",method=\"POST\",status=\"200\"} 1",
1726 "harn_http_request_duration_seconds_bucket{endpoint=\"/triggers/github\",le=\"0.05\"} 1",
1727 "harn_http_body_size_bytes_bucket{endpoint=\"/triggers/github\",le=\"512\"} 1",
1728 "harn_trigger_received_total{provider=\"github\",trigger_id=\"github-new-issue\"} 1",
1729 "harn_trigger_deduped_total{reason=\"inbox_duplicate\",trigger_id=\"github-new-issue\"} 1",
1730 "harn_trigger_predicate_evaluations_total{result=\"true\",trigger_id=\"github-new-issue\"} 1",
1731 "harn_trigger_predicate_cost_usd_bucket{le=\"0.01\",trigger_id=\"github-new-issue\"} 1",
1732 "harn_trigger_dispatched_total{handler_kind=\"local\",outcome=\"succeeded\",trigger_id=\"github-new-issue\"} 1",
1733 "harn_trigger_retries_total{attempt=\"2\",trigger_id=\"github-new-issue\"} 1",
1734 "harn_trigger_dlq_total{reason=\"retry_exhausted\",trigger_id=\"github-new-issue\"} 1",
1735 "harn_trigger_inflight{trigger_id=\"github-new-issue\"} 0",
1736 "harn_event_log_append_duration_seconds_bucket{le=\"0.005\",topic=\"orchestrator.triggers.pending\"} 1",
1737 "harn_event_log_topic_size_bytes{topic=\"orchestrator.triggers.pending\"} 2048",
1738 "harn_event_log_consumer_lag{consumer=\"orchestrator-pump\",topic=\"orchestrator.triggers.pending\"} 0",
1739 "harn_trigger_budget_cost_today_usd{trigger_id=\"github-new-issue\"} 0.002",
1740 "harn_trigger_budget_exhausted_total{strategy=\"daily_budget_exceeded\",trigger_id=\"github-new-issue\"} 1",
1741 "harn_a2a_hops_total{outcome=\"succeeded\",target=\"agent.example\"} 1",
1742 "harn_a2a_hop_duration_seconds_bucket{le=\"0.01\",target=\"agent.example\"} 1",
1743 "harn_worker_queue_depth{queue=\"triage\"} 1",
1744 "harn_worker_queue_claim_age_seconds_bucket{le=\"5\",queue=\"triage\"} 1",
1745 "harn_orchestrator_pump_backlog{topic=\"trigger.inbox.envelopes\"} 2",
1746 "harn_orchestrator_pump_outstanding{topic=\"trigger.inbox.envelopes\"} 1",
1747 "harn_orchestrator_pump_admission_delay_seconds_bucket{le=\"0.05\",topic=\"trigger.inbox.envelopes\"} 1",
1748 "harn_llm_calls_total{model=\"mock\",outcome=\"succeeded\",provider=\"mock\"} 1",
1749 "harn_llm_cost_usd_total{model=\"mock\",provider=\"mock\"} 0.01",
1750 "harn_llm_cache_hits_total{provider=\"mock\"} 1",
1751 ] {
1752 assert!(rendered.contains(needle), "missing {needle}\n{rendered}");
1753 }
1754 }
1755}