1use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex, OnceLock};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25 registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26 ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod a2a_push;
30pub mod cron;
31pub mod effect_policy;
32pub mod github;
33pub mod harn_module;
34pub mod hmac;
35pub mod linear;
36pub mod notion;
37pub mod slack;
38pub mod stream;
39#[cfg(test)]
40pub(crate) mod test_util;
41pub mod testkit;
42pub mod webhook;
43
44pub use a2a_push::A2aPushConnector;
45pub use cron::{CatchupMode, CronConnector};
46pub use effect_policy::{
47 connector_export_denied_builtin_reason, connector_export_effect_class,
48 default_connector_export_policy, ConnectorExportEffectClass, HarnConnectorEffectPolicies,
49};
50pub use github::GitHubConnector;
51pub use harn_module::{
52 load_contract as load_harn_connector_contract, HarnConnector, HarnConnectorContract,
53};
54pub use hmac::{
55 verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
56 DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
57 DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
58 DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
59 DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
60 DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
61 SIGNATURE_VERIFY_AUDIT_TOPIC,
62};
63pub use linear::LinearConnector;
64pub use notion::{
65 load_pending_webhook_handshakes, NotionConnector, PersistedNotionWebhookHandshake,
66};
67pub use slack::SlackConnector;
68pub use stream::StreamConnector;
69use webhook::WebhookProviderProfile;
70pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
71
72const OUTBOUND_CONNECTOR_HTTP_TIMEOUT: StdDuration = StdDuration::from_secs(30);
73
74pub const RUST_PROVIDER_CONNECTOR_COMPAT_PROVIDERS: &[&str] =
79 &["github", "linear", "notion", "slack"];
80
81pub fn is_rust_provider_connector_compat_provider(provider: &str) -> bool {
82 RUST_PROVIDER_CONNECTOR_COMPAT_PROVIDERS.contains(&provider)
83}
84
85pub(crate) fn outbound_http_client(user_agent: &'static str) -> reqwest::Client {
86 reqwest::Client::builder()
87 .user_agent(user_agent)
88 .timeout(OUTBOUND_CONNECTOR_HTTP_TIMEOUT)
89 .redirect(reqwest::redirect::Policy::custom(|attempt| {
90 if attempt.previous().len() >= 10 {
91 attempt.error("too many redirects")
92 } else if crate::egress::redirect_url_allowed(
93 "connector_redirect",
94 attempt.url().as_str(),
95 ) {
96 attempt.follow()
97 } else {
98 attempt.error("egress policy blocked redirect target")
99 }
100 }))
101 .build()
102 .expect("connector HTTP client configuration should be valid")
103}
104
105pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
107
108thread_local! {
109 static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
110 RefCell::new(BTreeMap::new());
111}
112
113#[async_trait]
115pub trait Connector: Send + Sync {
116 fn provider_id(&self) -> &ProviderId;
118
119 fn kinds(&self) -> &[TriggerKind];
121
122 async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
124
125 async fn activate(
127 &self,
128 bindings: &[TriggerBinding],
129 ) -> Result<ActivationHandle, ConnectorError>;
130
131 async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
133 Ok(())
134 }
135
136 async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
138
139 async fn normalize_inbound_result(
142 &self,
143 raw: RawInbound,
144 ) -> Result<ConnectorNormalizeResult, ConnectorError> {
145 self.normalize_inbound(raw)
146 .await
147 .map(ConnectorNormalizeResult::event)
148 }
149
150 fn payload_schema(&self) -> ProviderPayloadSchema;
152
153 fn client(&self) -> Arc<dyn ConnectorClient>;
155}
156
157#[derive(Clone, Debug, PartialEq)]
159pub struct ConnectorHttpResponse {
160 pub status: u16,
161 pub headers: BTreeMap<String, String>,
162 pub body: JsonValue,
163}
164
165impl ConnectorHttpResponse {
166 pub fn new(status: u16, headers: BTreeMap<String, String>, body: JsonValue) -> Self {
167 Self {
168 status,
169 headers,
170 body,
171 }
172 }
173}
174
175#[derive(Clone, Debug, PartialEq)]
177pub enum ConnectorNormalizeResult {
178 Event(Box<TriggerEvent>),
179 Batch(Vec<TriggerEvent>),
180 ImmediateResponse {
181 response: ConnectorHttpResponse,
182 events: Vec<TriggerEvent>,
183 },
184 Reject(ConnectorHttpResponse),
185}
186
187impl ConnectorNormalizeResult {
188 pub fn event(event: TriggerEvent) -> Self {
189 Self::Event(Box::new(event))
190 }
191
192 pub fn into_events(self) -> Vec<TriggerEvent> {
193 match self {
194 Self::Event(event) => vec![*event],
195 Self::Batch(events) | Self::ImmediateResponse { events, .. } => events,
196 Self::Reject(_) => Vec::new(),
197 }
198 }
199}
200
201#[derive(Clone, Debug, PartialEq)]
202pub enum PostNormalizeOutcome {
203 Ready(Box<TriggerEvent>),
204 DuplicateDropped,
205}
206
207pub async fn postprocess_normalized_event(
208 inbox: &InboxIndex,
209 binding_id: &str,
210 dedupe_enabled: bool,
211 dedupe_ttl: StdDuration,
212 mut event: TriggerEvent,
213) -> Result<PostNormalizeOutcome, ConnectorError> {
214 if dedupe_enabled && !event.dedupe_claimed() {
215 if !inbox
216 .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
217 .await?
218 {
219 return Ok(PostNormalizeOutcome::DuplicateDropped);
220 }
221 event.mark_dedupe_claimed();
222 }
223
224 Ok(PostNormalizeOutcome::Ready(Box::new(event)))
225}
226
227#[async_trait]
229pub trait ConnectorClient: Send + Sync {
230 async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
231}
232
233#[derive(Clone, Debug, PartialEq, Eq)]
235pub enum ClientError {
236 MethodNotFound(String),
237 InvalidArgs(String),
238 RateLimited(String),
239 Transport(String),
240 EgressBlocked(crate::egress::EgressBlocked),
241 Other(String),
242}
243
244impl fmt::Display for ClientError {
245 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246 match self {
247 Self::MethodNotFound(message)
248 | Self::InvalidArgs(message)
249 | Self::RateLimited(message)
250 | Self::Transport(message)
251 | Self::Other(message) => message.fmt(f),
252 Self::EgressBlocked(blocked) => blocked.fmt(f),
253 }
254 }
255}
256
257impl std::error::Error for ClientError {}
258
259#[derive(Debug)]
261pub enum ConnectorError {
262 DuplicateProvider(String),
263 DuplicateDelivery(String),
264 UnknownProvider(String),
265 MissingHeader(String),
266 InvalidHeader {
267 name: String,
268 detail: String,
269 },
270 InvalidSignature(String),
271 TimestampOutOfWindow {
272 timestamp: OffsetDateTime,
273 now: OffsetDateTime,
274 window: time::Duration,
275 },
276 Json(String),
277 Secret(String),
278 EventLog(String),
279 HarnRuntime(String),
280 Client(ClientError),
281 Unsupported(String),
282 Activation(String),
283}
284
285impl ConnectorError {
286 pub fn invalid_signature(message: impl Into<String>) -> Self {
287 Self::InvalidSignature(message.into())
288 }
289}
290
291impl fmt::Display for ConnectorError {
292 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
293 match self {
294 Self::DuplicateProvider(provider) => {
295 write!(f, "connector provider `{provider}` is already registered")
296 }
297 Self::DuplicateDelivery(message) => message.fmt(f),
298 Self::UnknownProvider(provider) => {
299 write!(f, "connector provider `{provider}` is not registered")
300 }
301 Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
302 Self::InvalidHeader { name, detail } => {
303 write!(f, "invalid header `{name}`: {detail}")
304 }
305 Self::InvalidSignature(message)
306 | Self::Json(message)
307 | Self::Secret(message)
308 | Self::EventLog(message)
309 | Self::HarnRuntime(message)
310 | Self::Unsupported(message)
311 | Self::Activation(message) => message.fmt(f),
312 Self::TimestampOutOfWindow {
313 timestamp,
314 now,
315 window,
316 } => write!(
317 f,
318 "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
319 ),
320 Self::Client(error) => error.fmt(f),
321 }
322 }
323}
324
325impl std::error::Error for ConnectorError {}
326
327impl From<crate::event_log::LogError> for ConnectorError {
328 fn from(value: crate::event_log::LogError) -> Self {
329 Self::EventLog(value.to_string())
330 }
331}
332
333impl From<crate::secrets::SecretError> for ConnectorError {
334 fn from(value: crate::secrets::SecretError) -> Self {
335 Self::Secret(value.to_string())
336 }
337}
338
339impl From<serde_json::Error> for ConnectorError {
340 fn from(value: serde_json::Error) -> Self {
341 Self::Json(value.to_string())
342 }
343}
344
345impl From<ClientError> for ConnectorError {
346 fn from(value: ClientError) -> Self {
347 Self::Client(value)
348 }
349}
350
351#[derive(Clone)]
353pub struct ConnectorCtx {
354 pub event_log: Arc<AnyEventLog>,
355 pub secrets: Arc<dyn SecretProvider>,
356 pub inbox: Arc<InboxIndex>,
357 pub metrics: Arc<MetricsRegistry>,
358 pub rate_limiter: Arc<RateLimiterFactory>,
359}
360
361#[derive(Clone, Debug, Default, PartialEq, Eq)]
363pub struct ConnectorMetricsSnapshot {
364 pub inbox_claims_written: u64,
365 pub inbox_duplicates_rejected: u64,
366 pub inbox_fast_path_hits: u64,
367 pub inbox_durable_hits: u64,
368 pub inbox_expired_entries: u64,
369 pub inbox_active_entries: u64,
370 pub linear_timestamp_rejections_total: u64,
371 pub dispatch_succeeded_total: u64,
372 pub dispatch_failed_total: u64,
373 pub retry_scheduled_total: u64,
374 pub slack_delivery_success_total: u64,
375 pub slack_delivery_failure_total: u64,
376}
377
378type MetricLabels = BTreeMap<String, String>;
379
380#[derive(Clone, Debug, Default, PartialEq)]
381struct HistogramMetric {
382 buckets: BTreeMap<String, u64>,
383 count: u64,
384 sum: f64,
385}
386
387static ACTIVE_METRICS_REGISTRY: OnceLock<Mutex<Option<Arc<MetricsRegistry>>>> = OnceLock::new();
388
389pub fn install_active_metrics_registry(metrics: Arc<MetricsRegistry>) {
390 let slot = ACTIVE_METRICS_REGISTRY.get_or_init(|| Mutex::new(None));
391 *slot.lock().expect("active metrics registry poisoned") = Some(metrics);
392}
393
394pub fn clear_active_metrics_registry() {
395 if let Some(slot) = ACTIVE_METRICS_REGISTRY.get() {
396 *slot.lock().expect("active metrics registry poisoned") = None;
397 }
398}
399
400pub fn active_metrics_registry() -> Option<Arc<MetricsRegistry>> {
401 ACTIVE_METRICS_REGISTRY.get().and_then(|slot| {
402 slot.lock()
403 .expect("active metrics registry poisoned")
404 .clone()
405 })
406}
407
408#[derive(Debug, Default)]
410pub struct MetricsRegistry {
411 inbox_claims_written: AtomicU64,
412 inbox_duplicates_rejected: AtomicU64,
413 inbox_fast_path_hits: AtomicU64,
414 inbox_durable_hits: AtomicU64,
415 inbox_expired_entries: AtomicU64,
416 inbox_active_entries: AtomicU64,
417 linear_timestamp_rejections_total: AtomicU64,
418 dispatch_succeeded_total: AtomicU64,
419 dispatch_failed_total: AtomicU64,
420 retry_scheduled_total: AtomicU64,
421 slack_delivery_success_total: AtomicU64,
422 slack_delivery_failure_total: AtomicU64,
423 custom_counters: Mutex<BTreeMap<String, u64>>,
424 counters: Mutex<BTreeMap<(String, MetricLabels), f64>>,
425 gauges: Mutex<BTreeMap<(String, MetricLabels), f64>>,
426 histograms: Mutex<BTreeMap<(String, MetricLabels), HistogramMetric>>,
427 pending_trigger_events: Mutex<BTreeMap<MetricLabels, BTreeMap<String, i64>>>,
428}
429
430impl MetricsRegistry {
431 const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0];
432 const TRIGGER_LATENCY_BUCKETS: [f64; 15] = [
433 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0,
434 ];
435 const SIZE_BUCKETS: [f64; 9] = [
436 128.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0, 10485760.0,
437 ];
438
439 pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
440 ConnectorMetricsSnapshot {
441 inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
442 inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
443 inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
444 inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
445 inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
446 inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
447 linear_timestamp_rejections_total: self
448 .linear_timestamp_rejections_total
449 .load(Ordering::Relaxed),
450 dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
451 dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
452 retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
453 slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
454 slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
455 }
456 }
457
458 pub(crate) fn record_inbox_claim(&self) {
459 self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
460 }
461
462 pub(crate) fn record_inbox_duplicate_fast_path(&self) {
463 self.inbox_duplicates_rejected
464 .fetch_add(1, Ordering::Relaxed);
465 self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
466 }
467
468 pub(crate) fn record_inbox_duplicate_durable(&self) {
469 self.inbox_duplicates_rejected
470 .fetch_add(1, Ordering::Relaxed);
471 self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
472 }
473
474 pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
475 if count > 0 {
476 self.inbox_expired_entries
477 .fetch_add(count, Ordering::Relaxed);
478 }
479 }
480
481 pub(crate) fn set_inbox_active_entries(&self, count: usize) {
482 self.inbox_active_entries
483 .store(count as u64, Ordering::Relaxed);
484 }
485
486 pub fn record_linear_timestamp_rejection(&self) {
487 self.linear_timestamp_rejections_total
488 .fetch_add(1, Ordering::Relaxed);
489 }
490
491 pub fn record_dispatch_succeeded(&self) {
492 self.dispatch_succeeded_total
493 .fetch_add(1, Ordering::Relaxed);
494 }
495
496 pub fn record_dispatch_failed(&self) {
497 self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
498 }
499
500 pub fn record_retry_scheduled(&self) {
501 self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
502 }
503
504 pub fn record_slack_delivery_success(&self) {
505 self.slack_delivery_success_total
506 .fetch_add(1, Ordering::Relaxed);
507 }
508
509 pub fn record_slack_delivery_failure(&self) {
510 self.slack_delivery_failure_total
511 .fetch_add(1, Ordering::Relaxed);
512 }
513
514 pub fn record_custom_counter(&self, name: &str, amount: u64) {
515 if amount == 0 {
516 return;
517 }
518 let mut counters = self
519 .custom_counters
520 .lock()
521 .expect("custom counters poisoned");
522 *counters.entry(name.to_string()).or_default() += amount;
523 }
524
525 pub fn record_http_request(
526 &self,
527 endpoint: &str,
528 method: &str,
529 status: u16,
530 duration: StdDuration,
531 body_size_bytes: usize,
532 ) {
533 self.increment_counter(
534 "harn_http_requests_total",
535 labels([
536 ("endpoint", endpoint),
537 ("method", method),
538 ("status", &status.to_string()),
539 ]),
540 1,
541 );
542 self.observe_histogram(
543 "harn_http_request_duration_seconds",
544 labels([("endpoint", endpoint)]),
545 duration.as_secs_f64(),
546 &Self::DURATION_BUCKETS,
547 );
548 self.observe_histogram(
549 "harn_http_body_size_bytes",
550 labels([("endpoint", endpoint)]),
551 body_size_bytes as f64,
552 &Self::SIZE_BUCKETS,
553 );
554 }
555
556 pub fn record_trigger_received(&self, trigger_id: &str, provider: &str) {
557 self.increment_counter(
558 "harn_trigger_received_total",
559 labels([("trigger_id", trigger_id), ("provider", provider)]),
560 1,
561 );
562 }
563
564 pub fn record_trigger_deduped(&self, trigger_id: &str, reason: &str) {
565 self.increment_counter(
566 "harn_trigger_deduped_total",
567 labels([("trigger_id", trigger_id), ("reason", reason)]),
568 1,
569 );
570 }
571
572 pub fn record_trigger_predicate_evaluation(
573 &self,
574 trigger_id: &str,
575 result: bool,
576 cost_usd: f64,
577 ) {
578 self.increment_counter(
579 "harn_trigger_predicate_evaluations_total",
580 labels([
581 ("trigger_id", trigger_id),
582 ("result", if result { "true" } else { "false" }),
583 ]),
584 1,
585 );
586 self.observe_histogram(
587 "harn_trigger_predicate_cost_usd",
588 labels([("trigger_id", trigger_id)]),
589 cost_usd.max(0.0),
590 &[0.0, 0.001, 0.01, 0.05, 0.1, 1.0],
591 );
592 }
593
594 pub fn record_trigger_dispatched(&self, trigger_id: &str, handler_kind: &str, outcome: &str) {
595 self.increment_counter(
596 "harn_trigger_dispatched_total",
597 labels([
598 ("trigger_id", trigger_id),
599 ("handler_kind", handler_kind),
600 ("outcome", outcome),
601 ]),
602 1,
603 );
604 }
605
606 pub fn record_trigger_retry(&self, trigger_id: &str, attempt: u32) {
607 self.increment_counter(
608 "harn_trigger_retries_total",
609 labels([
610 ("trigger_id", trigger_id),
611 ("attempt", &attempt.to_string()),
612 ]),
613 1,
614 );
615 }
616
617 pub fn record_trigger_dlq(&self, trigger_id: &str, reason: &str) {
618 self.increment_counter(
619 "harn_trigger_dlq_total",
620 labels([("trigger_id", trigger_id), ("reason", reason)]),
621 1,
622 );
623 }
624
625 pub fn record_trigger_accepted_to_normalized(
626 &self,
627 trigger_id: &str,
628 binding_key: &str,
629 provider: &str,
630 tenant_id: Option<&str>,
631 status: &str,
632 duration: StdDuration,
633 ) {
634 self.observe_histogram(
635 "harn_trigger_webhook_accepted_to_normalized_seconds",
636 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
637 duration.as_secs_f64(),
638 &Self::TRIGGER_LATENCY_BUCKETS,
639 );
640 }
641
642 pub fn record_trigger_accepted_to_queue_append(
643 &self,
644 trigger_id: &str,
645 binding_key: &str,
646 provider: &str,
647 tenant_id: Option<&str>,
648 status: &str,
649 duration: StdDuration,
650 ) {
651 self.observe_histogram(
652 "harn_trigger_webhook_accepted_to_queue_append_seconds",
653 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
654 duration.as_secs_f64(),
655 &Self::TRIGGER_LATENCY_BUCKETS,
656 );
657 }
658
659 pub fn record_trigger_queue_age_at_dispatch_admission(
660 &self,
661 trigger_id: &str,
662 binding_key: &str,
663 provider: &str,
664 tenant_id: Option<&str>,
665 status: &str,
666 age: StdDuration,
667 ) {
668 self.observe_histogram(
669 "harn_trigger_queue_age_at_dispatch_admission_seconds",
670 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
671 age.as_secs_f64(),
672 &Self::TRIGGER_LATENCY_BUCKETS,
673 );
674 }
675
676 pub fn record_trigger_queue_age_at_dispatch_start(
677 &self,
678 trigger_id: &str,
679 binding_key: &str,
680 provider: &str,
681 tenant_id: Option<&str>,
682 status: &str,
683 age: StdDuration,
684 ) {
685 self.observe_histogram(
686 "harn_trigger_queue_age_at_dispatch_start_seconds",
687 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
688 age.as_secs_f64(),
689 &Self::TRIGGER_LATENCY_BUCKETS,
690 );
691 }
692
693 pub fn record_trigger_dispatch_runtime(
694 &self,
695 trigger_id: &str,
696 binding_key: &str,
697 provider: &str,
698 tenant_id: Option<&str>,
699 status: &str,
700 duration: StdDuration,
701 ) {
702 self.observe_histogram(
703 "harn_trigger_dispatch_runtime_seconds",
704 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
705 duration.as_secs_f64(),
706 &Self::TRIGGER_LATENCY_BUCKETS,
707 );
708 }
709
710 pub fn record_trigger_retry_delay(
711 &self,
712 trigger_id: &str,
713 binding_key: &str,
714 provider: &str,
715 tenant_id: Option<&str>,
716 status: &str,
717 duration: StdDuration,
718 ) {
719 self.observe_histogram(
720 "harn_trigger_retry_delay_seconds",
721 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
722 duration.as_secs_f64(),
723 &Self::TRIGGER_LATENCY_BUCKETS,
724 );
725 }
726
727 pub fn record_trigger_accepted_to_dlq(
728 &self,
729 trigger_id: &str,
730 binding_key: &str,
731 provider: &str,
732 tenant_id: Option<&str>,
733 status: &str,
734 duration: StdDuration,
735 ) {
736 self.observe_histogram(
737 "harn_trigger_accepted_to_dlq_seconds",
738 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
739 duration.as_secs_f64(),
740 &Self::TRIGGER_LATENCY_BUCKETS,
741 );
742 }
743
744 pub fn note_trigger_pending_event(
745 &self,
746 event_id: &str,
747 trigger_id: &str,
748 binding_key: &str,
749 provider: &str,
750 tenant_id: Option<&str>,
751 accepted_at_ms: i64,
752 now_ms: i64,
753 ) {
754 let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
755 {
756 let mut pending = self
757 .pending_trigger_events
758 .lock()
759 .expect("pending trigger events poisoned");
760 pending
761 .entry(labels.clone())
762 .or_default()
763 .insert(event_id.to_string(), accepted_at_ms);
764 }
765 self.refresh_oldest_pending_gauge(labels, now_ms);
766 }
767
768 pub fn clear_trigger_pending_event(
769 &self,
770 event_id: &str,
771 trigger_id: &str,
772 binding_key: &str,
773 provider: &str,
774 tenant_id: Option<&str>,
775 now_ms: i64,
776 ) {
777 let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
778 {
779 let mut pending = self
780 .pending_trigger_events
781 .lock()
782 .expect("pending trigger events poisoned");
783 if let Some(events) = pending.get_mut(&labels) {
784 events.remove(event_id);
785 if events.is_empty() {
786 pending.remove(&labels);
787 }
788 }
789 }
790 self.refresh_oldest_pending_gauge(labels, now_ms);
791 }
792
793 pub fn set_trigger_inflight(&self, trigger_id: &str, count: u64) {
794 self.set_gauge(
795 "harn_trigger_inflight",
796 labels([("trigger_id", trigger_id)]),
797 count as f64,
798 );
799 }
800
801 pub fn set_trigger_budget_cost_today(&self, trigger_id: &str, cost_usd: f64) {
802 self.set_gauge(
803 "harn_trigger_budget_cost_today_usd",
804 labels([("trigger_id", trigger_id)]),
805 cost_usd.max(0.0),
806 );
807 }
808
809 pub fn record_trigger_budget_exhausted(&self, trigger_id: &str, strategy: &str) {
810 self.increment_counter(
811 "harn_trigger_budget_exhausted_total",
812 labels([("trigger_id", trigger_id), ("strategy", strategy)]),
813 1,
814 );
815 }
816
817 pub fn record_backpressure_event(&self, dimension: &str, action: &str) {
818 self.increment_counter(
819 "harn_backpressure_events_total",
820 labels([("dimension", dimension), ("action", action)]),
821 1,
822 );
823 }
824
825 pub fn record_event_log_append(
826 &self,
827 topic: &str,
828 duration: StdDuration,
829 payload_bytes: usize,
830 ) {
831 self.observe_histogram(
832 "harn_event_log_append_duration_seconds",
833 labels([("topic", topic)]),
834 duration.as_secs_f64(),
835 &Self::DURATION_BUCKETS,
836 );
837 self.set_gauge(
838 "harn_event_log_topic_size_bytes",
839 labels([("topic", topic)]),
840 payload_bytes as f64,
841 );
842 }
843
844 pub fn set_event_log_consumer_lag(&self, topic: &str, consumer: &str, lag: u64) {
845 self.set_gauge(
846 "harn_event_log_consumer_lag",
847 labels([("topic", topic), ("consumer", consumer)]),
848 lag as f64,
849 );
850 }
851
852 pub fn record_a2a_hop(&self, target: &str, outcome: &str, duration: StdDuration) {
853 self.increment_counter(
854 "harn_a2a_hops_total",
855 labels([("target", target), ("outcome", outcome)]),
856 1,
857 );
858 self.observe_histogram(
859 "harn_a2a_hop_duration_seconds",
860 labels([("target", target)]),
861 duration.as_secs_f64(),
862 &Self::DURATION_BUCKETS,
863 );
864 }
865
866 pub fn set_worker_queue_depth(&self, queue: &str, depth: u64) {
867 self.set_gauge(
868 "harn_worker_queue_depth",
869 labels([("queue", queue)]),
870 depth as f64,
871 );
872 }
873
874 pub fn record_worker_queue_claim_age(&self, queue: &str, age_seconds: f64) {
875 self.observe_histogram(
876 "harn_worker_queue_claim_age_seconds",
877 labels([("queue", queue)]),
878 age_seconds.max(0.0),
879 &Self::DURATION_BUCKETS,
880 );
881 }
882
883 pub fn record_scheduler_selection(
885 &self,
886 queue: &str,
887 fairness_dimension: &str,
888 fairness_key: &str,
889 ) {
890 self.increment_counter(
891 "harn_scheduler_selections_total",
892 labels([
893 ("queue", queue),
894 ("fairness_dimension", fairness_dimension),
895 ("fairness_key", fairness_key),
896 ]),
897 1,
898 );
899 }
900
901 pub fn record_scheduler_deferral(
904 &self,
905 queue: &str,
906 fairness_dimension: &str,
907 fairness_key: &str,
908 ) {
909 self.increment_counter(
910 "harn_scheduler_deferrals_total",
911 labels([
912 ("queue", queue),
913 ("fairness_dimension", fairness_dimension),
914 ("fairness_key", fairness_key),
915 ]),
916 1,
917 );
918 }
919
920 pub fn record_scheduler_starvation_promotion(
922 &self,
923 queue: &str,
924 fairness_dimension: &str,
925 fairness_key: &str,
926 ) {
927 self.increment_counter(
928 "harn_scheduler_starvation_promotions_total",
929 labels([
930 ("queue", queue),
931 ("fairness_dimension", fairness_dimension),
932 ("fairness_key", fairness_key),
933 ]),
934 1,
935 );
936 }
937
938 pub fn set_scheduler_deficit(
940 &self,
941 queue: &str,
942 fairness_dimension: &str,
943 fairness_key: &str,
944 deficit: i64,
945 ) {
946 self.set_gauge(
947 "harn_scheduler_deficit",
948 labels([
949 ("queue", queue),
950 ("fairness_dimension", fairness_dimension),
951 ("fairness_key", fairness_key),
952 ]),
953 deficit as f64,
954 );
955 }
956
957 pub fn set_scheduler_oldest_eligible_age(
959 &self,
960 queue: &str,
961 fairness_dimension: &str,
962 fairness_key: &str,
963 age_ms: u64,
964 ) {
965 self.set_gauge(
966 "harn_scheduler_oldest_eligible_age_seconds",
967 labels([
968 ("queue", queue),
969 ("fairness_dimension", fairness_dimension),
970 ("fairness_key", fairness_key),
971 ]),
972 age_ms as f64 / 1000.0,
973 );
974 }
975
976 pub fn set_orchestrator_pump_backlog(&self, topic: &str, count: u64) {
977 self.set_gauge(
978 "harn_orchestrator_pump_backlog",
979 labels([("topic", topic)]),
980 count as f64,
981 );
982 }
983
984 pub fn set_orchestrator_pump_outstanding(&self, topic: &str, count: usize) {
985 self.set_gauge(
986 "harn_orchestrator_pump_outstanding",
987 labels([("topic", topic)]),
988 count as f64,
989 );
990 }
991
992 pub fn record_orchestrator_pump_admission_delay(&self, topic: &str, duration: StdDuration) {
993 self.observe_histogram(
994 "harn_orchestrator_pump_admission_delay_seconds",
995 labels([("topic", topic)]),
996 duration.as_secs_f64(),
997 &Self::DURATION_BUCKETS,
998 );
999 }
1000
1001 pub fn record_llm_call(&self, provider: &str, model: &str, outcome: &str, cost_usd: f64) {
1002 self.increment_counter(
1003 "harn_llm_calls_total",
1004 labels([
1005 ("provider", provider),
1006 ("model", model),
1007 ("outcome", outcome),
1008 ]),
1009 1,
1010 );
1011 if cost_usd > 0.0 {
1012 self.increment_counter(
1013 "harn_llm_cost_usd_total",
1014 labels([("provider", provider), ("model", model)]),
1015 cost_usd,
1016 );
1017 } else {
1018 self.ensure_counter(
1019 "harn_llm_cost_usd_total",
1020 labels([("provider", provider), ("model", model)]),
1021 );
1022 }
1023 }
1024
1025 pub fn record_llm_cache_hit(&self, provider: &str) {
1026 self.increment_counter(
1027 "harn_llm_cache_hits_total",
1028 labels([("provider", provider)]),
1029 1,
1030 );
1031 }
1032
1033 pub fn render_prometheus(&self) -> String {
1034 let snapshot = self.snapshot();
1035 let counters = [
1036 (
1037 "connector_linear_timestamp_rejections_total",
1038 snapshot.linear_timestamp_rejections_total,
1039 ),
1040 (
1041 "dispatch_succeeded_total",
1042 snapshot.dispatch_succeeded_total,
1043 ),
1044 ("dispatch_failed_total", snapshot.dispatch_failed_total),
1045 ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
1046 ("retry_scheduled_total", snapshot.retry_scheduled_total),
1047 (
1048 "slack_events_delivery_success_total",
1049 snapshot.slack_delivery_success_total,
1050 ),
1051 (
1052 "slack_events_delivery_failure_total",
1053 snapshot.slack_delivery_failure_total,
1054 ),
1055 ];
1056
1057 let mut rendered = String::new();
1058 for (name, value) in counters {
1059 rendered.push_str("# TYPE ");
1060 rendered.push_str(name);
1061 rendered.push_str(" counter\n");
1062 rendered.push_str(name);
1063 rendered.push(' ');
1064 rendered.push_str(&value.to_string());
1065 rendered.push('\n');
1066 }
1067 let custom_counters = self
1068 .custom_counters
1069 .lock()
1070 .expect("custom counters poisoned");
1071 for (name, value) in custom_counters.iter() {
1072 let metric_name = format!(
1073 "connector_custom_{}_total",
1074 name.chars()
1075 .map(|ch| if ch.is_ascii_alphanumeric() || ch == '_' {
1076 ch
1077 } else {
1078 '_'
1079 })
1080 .collect::<String>()
1081 );
1082 rendered.push_str("# TYPE ");
1083 rendered.push_str(&metric_name);
1084 rendered.push_str(" counter\n");
1085 rendered.push_str(&metric_name);
1086 rendered.push(' ');
1087 rendered.push_str(&value.to_string());
1088 rendered.push('\n');
1089 }
1090 rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
1091 rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
1092 rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
1093 rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
1094 self.render_generic_metrics(&mut rendered);
1095 rendered
1096 }
1097
1098 fn increment_counter(&self, name: &str, labels: MetricLabels, amount: impl Into<f64>) {
1099 let amount = amount.into();
1100 if amount <= 0.0 || !amount.is_finite() {
1101 return;
1102 }
1103 let mut counters = self.counters.lock().expect("metrics counters poisoned");
1104 *counters.entry((name.to_string(), labels)).or_default() += amount;
1105 }
1106
1107 fn ensure_counter(&self, name: &str, labels: MetricLabels) {
1108 let mut counters = self.counters.lock().expect("metrics counters poisoned");
1109 counters.entry((name.to_string(), labels)).or_default();
1110 }
1111
1112 fn set_gauge(&self, name: &str, labels: MetricLabels, value: f64) {
1113 let mut gauges = self.gauges.lock().expect("metrics gauges poisoned");
1114 gauges.insert((name.to_string(), labels), value);
1115 }
1116
1117 fn observe_histogram(
1118 &self,
1119 name: &str,
1120 labels: MetricLabels,
1121 value: f64,
1122 bucket_bounds: &[f64],
1123 ) {
1124 if !value.is_finite() {
1125 return;
1126 }
1127 let mut histograms = self.histograms.lock().expect("metrics histograms poisoned");
1128 let histogram = histograms
1129 .entry((name.to_string(), labels))
1130 .or_insert_with(|| HistogramMetric {
1131 buckets: bucket_bounds
1132 .iter()
1133 .map(|bound| (prometheus_float(*bound), 0))
1134 .chain(std::iter::once(("+Inf".to_string(), 0)))
1135 .collect(),
1136 count: 0,
1137 sum: 0.0,
1138 });
1139 histogram.count += 1;
1140 histogram.sum += value;
1141 for bound in bucket_bounds {
1142 if value <= *bound {
1143 let key = prometheus_float(*bound);
1144 *histogram.buckets.entry(key).or_default() += 1;
1145 }
1146 }
1147 *histogram.buckets.entry("+Inf".to_string()).or_default() += 1;
1148 }
1149
1150 fn refresh_oldest_pending_gauge(&self, labels: MetricLabels, now_ms: i64) {
1151 let oldest_accepted_at_ms = self
1152 .pending_trigger_events
1153 .lock()
1154 .expect("pending trigger events poisoned")
1155 .get(&labels)
1156 .and_then(|events| events.values().min().copied());
1157 let age_seconds = oldest_accepted_at_ms
1158 .map(|accepted_at_ms| millis_delta(now_ms, accepted_at_ms).as_secs_f64())
1159 .unwrap_or(0.0);
1160 self.set_gauge(
1161 "harn_trigger_oldest_pending_age_seconds",
1162 labels,
1163 age_seconds,
1164 );
1165 }
1166
1167 fn render_generic_metrics(&self, rendered: &mut String) {
1168 let counters = self
1169 .counters
1170 .lock()
1171 .expect("metrics counters poisoned")
1172 .clone();
1173 let gauges = self.gauges.lock().expect("metrics gauges poisoned").clone();
1174 let histograms = self
1175 .histograms
1176 .lock()
1177 .expect("metrics histograms poisoned")
1178 .clone();
1179
1180 for name in metric_family_names(MetricKind::Counter) {
1181 rendered.push_str("# TYPE ");
1182 rendered.push_str(name);
1183 rendered.push_str(" counter\n");
1184 for ((sample_name, labels), value) in counters.iter().filter(|((n, _), _)| n == name) {
1185 render_sample(rendered, sample_name, labels, *value);
1186 }
1187 }
1188 for name in metric_family_names(MetricKind::Gauge) {
1189 rendered.push_str("# TYPE ");
1190 rendered.push_str(name);
1191 rendered.push_str(" gauge\n");
1192 for ((sample_name, labels), value) in gauges.iter().filter(|((n, _), _)| n == name) {
1193 render_sample(rendered, sample_name, labels, *value);
1194 }
1195 }
1196 for name in metric_family_names(MetricKind::Histogram) {
1197 rendered.push_str("# TYPE ");
1198 rendered.push_str(name);
1199 rendered.push_str(" histogram\n");
1200 for ((sample_name, labels), histogram) in
1201 histograms.iter().filter(|((n, _), _)| n == name)
1202 {
1203 for (le, value) in &histogram.buckets {
1204 let mut bucket_labels = labels.clone();
1205 bucket_labels.insert("le".to_string(), le.clone());
1206 render_sample(
1207 rendered,
1208 &format!("{sample_name}_bucket"),
1209 &bucket_labels,
1210 *value as f64,
1211 );
1212 }
1213 render_sample(
1214 rendered,
1215 &format!("{sample_name}_sum"),
1216 labels,
1217 histogram.sum,
1218 );
1219 render_sample(
1220 rendered,
1221 &format!("{sample_name}_count"),
1222 labels,
1223 histogram.count as f64,
1224 );
1225 }
1226 }
1227 }
1228}
1229
1230#[derive(Clone, Copy)]
1231enum MetricKind {
1232 Counter,
1233 Gauge,
1234 Histogram,
1235}
1236
1237fn metric_family_names(kind: MetricKind) -> &'static [&'static str] {
1238 match kind {
1239 MetricKind::Counter => &[
1240 "harn_http_requests_total",
1241 "harn_trigger_received_total",
1242 "harn_trigger_deduped_total",
1243 "harn_trigger_predicate_evaluations_total",
1244 "harn_trigger_dispatched_total",
1245 "harn_trigger_retries_total",
1246 "harn_trigger_dlq_total",
1247 "harn_trigger_budget_exhausted_total",
1248 "harn_backpressure_events_total",
1249 "harn_a2a_hops_total",
1250 "harn_llm_calls_total",
1251 "harn_llm_cost_usd_total",
1252 "harn_llm_cache_hits_total",
1253 "harn_scheduler_selections_total",
1254 "harn_scheduler_deferrals_total",
1255 "harn_scheduler_starvation_promotions_total",
1256 ],
1257 MetricKind::Gauge => &[
1258 "harn_trigger_inflight",
1259 "harn_event_log_topic_size_bytes",
1260 "harn_event_log_consumer_lag",
1261 "harn_trigger_budget_cost_today_usd",
1262 "harn_worker_queue_depth",
1263 "harn_orchestrator_pump_backlog",
1264 "harn_orchestrator_pump_outstanding",
1265 "harn_trigger_oldest_pending_age_seconds",
1266 "harn_scheduler_deficit",
1267 "harn_scheduler_oldest_eligible_age_seconds",
1268 ],
1269 MetricKind::Histogram => &[
1270 "harn_http_request_duration_seconds",
1271 "harn_http_body_size_bytes",
1272 "harn_trigger_predicate_cost_usd",
1273 "harn_event_log_append_duration_seconds",
1274 "harn_a2a_hop_duration_seconds",
1275 "harn_worker_queue_claim_age_seconds",
1276 "harn_orchestrator_pump_admission_delay_seconds",
1277 "harn_trigger_webhook_accepted_to_normalized_seconds",
1278 "harn_trigger_webhook_accepted_to_queue_append_seconds",
1279 "harn_trigger_queue_age_at_dispatch_admission_seconds",
1280 "harn_trigger_queue_age_at_dispatch_start_seconds",
1281 "harn_trigger_dispatch_runtime_seconds",
1282 "harn_trigger_retry_delay_seconds",
1283 "harn_trigger_accepted_to_dlq_seconds",
1284 ],
1285 }
1286}
1287
1288fn labels<const N: usize>(pairs: [(&str, &str); N]) -> MetricLabels {
1289 pairs
1290 .into_iter()
1291 .map(|(name, value)| (name.to_string(), value.to_string()))
1292 .collect()
1293}
1294
1295fn trigger_lifecycle_labels(
1296 trigger_id: &str,
1297 binding_key: &str,
1298 provider: &str,
1299 tenant_id: Option<&str>,
1300 status: &str,
1301) -> MetricLabels {
1302 labels([
1303 ("binding_key", binding_key),
1304 ("provider", provider),
1305 ("status", status),
1306 ("tenant_id", tenant_label(tenant_id)),
1307 ("trigger_id", trigger_id),
1308 ])
1309}
1310
1311fn trigger_pending_labels(
1312 trigger_id: &str,
1313 binding_key: &str,
1314 provider: &str,
1315 tenant_id: Option<&str>,
1316) -> MetricLabels {
1317 labels([
1318 ("binding_key", binding_key),
1319 ("provider", provider),
1320 ("tenant_id", tenant_label(tenant_id)),
1321 ("trigger_id", trigger_id),
1322 ])
1323}
1324
1325fn tenant_label(tenant_id: Option<&str>) -> &str {
1326 tenant_id
1327 .map(str::trim)
1328 .filter(|value| !value.is_empty())
1329 .unwrap_or("none")
1330}
1331
1332fn millis_delta(later_ms: i64, earlier_ms: i64) -> StdDuration {
1333 StdDuration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
1334}
1335
1336fn render_sample(rendered: &mut String, name: &str, labels: &MetricLabels, value: f64) {
1337 rendered.push_str(name);
1338 if !labels.is_empty() {
1339 rendered.push('{');
1340 for (index, (label, label_value)) in labels.iter().enumerate() {
1341 if index > 0 {
1342 rendered.push(',');
1343 }
1344 rendered.push_str(label);
1345 rendered.push_str("=\"");
1346 rendered.push_str(&escape_label_value(label_value));
1347 rendered.push('"');
1348 }
1349 rendered.push('}');
1350 }
1351 rendered.push(' ');
1352 rendered.push_str(&prometheus_float(value));
1353 rendered.push('\n');
1354}
1355
1356fn escape_label_value(value: &str) -> String {
1357 value
1358 .chars()
1359 .flat_map(|ch| match ch {
1360 '\\' => "\\\\".chars().collect::<Vec<_>>(),
1361 '"' => "\\\"".chars().collect::<Vec<_>>(),
1362 '\n' => "\\n".chars().collect::<Vec<_>>(),
1363 other => vec![other],
1364 })
1365 .collect()
1366}
1367
1368fn prometheus_float(value: f64) -> String {
1369 if value.is_infinite() && value.is_sign_positive() {
1370 return "+Inf".to_string();
1371 }
1372 if value.fract() == 0.0 {
1373 format!("{value:.0}")
1374 } else {
1375 let rendered = format!("{value:.6}");
1376 rendered
1377 .trim_end_matches('0')
1378 .trim_end_matches('.')
1379 .to_string()
1380 }
1381}
1382
1383#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1385pub struct ProviderPayloadSchema {
1386 pub harn_schema_name: String,
1387 #[serde(default)]
1388 pub json_schema: JsonValue,
1389}
1390
1391impl ProviderPayloadSchema {
1392 pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
1393 Self {
1394 harn_schema_name: harn_schema_name.into(),
1395 json_schema,
1396 }
1397 }
1398
1399 pub fn named(harn_schema_name: impl Into<String>) -> Self {
1400 Self::new(harn_schema_name, JsonValue::Null)
1401 }
1402}
1403
1404impl Default for ProviderPayloadSchema {
1405 fn default() -> Self {
1406 Self::named("raw")
1407 }
1408}
1409
1410#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1412#[serde(transparent)]
1413pub struct TriggerKind(String);
1414
1415impl TriggerKind {
1416 pub fn new(value: impl Into<String>) -> Self {
1417 Self(value.into())
1418 }
1419
1420 pub fn as_str(&self) -> &str {
1421 self.0.as_str()
1422 }
1423}
1424
1425impl From<&str> for TriggerKind {
1426 fn from(value: &str) -> Self {
1427 Self::new(value)
1428 }
1429}
1430
1431impl From<String> for TriggerKind {
1432 fn from(value: String) -> Self {
1433 Self::new(value)
1434 }
1435}
1436
1437#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1439pub struct TriggerBinding {
1440 pub provider: ProviderId,
1441 pub kind: TriggerKind,
1442 pub binding_id: String,
1443 #[serde(default)]
1444 pub dedupe_key: Option<String>,
1445 #[serde(default = "default_dedupe_retention_days")]
1446 pub dedupe_retention_days: u32,
1447 #[serde(default)]
1448 pub config: JsonValue,
1449}
1450
1451impl TriggerBinding {
1452 pub fn new(
1453 provider: ProviderId,
1454 kind: impl Into<TriggerKind>,
1455 binding_id: impl Into<String>,
1456 ) -> Self {
1457 Self {
1458 provider,
1459 kind: kind.into(),
1460 binding_id: binding_id.into(),
1461 dedupe_key: None,
1462 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1463 config: JsonValue::Null,
1464 }
1465 }
1466}
1467
1468fn default_dedupe_retention_days() -> u32 {
1469 crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
1470}
1471
1472#[derive(Clone, Debug, Default)]
1474pub struct TriggerRegistry {
1475 bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
1476}
1477
1478impl TriggerRegistry {
1479 pub fn register(&mut self, binding: TriggerBinding) {
1480 self.bindings
1481 .entry(binding.provider.clone())
1482 .or_default()
1483 .push(binding);
1484 }
1485
1486 pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
1487 &self.bindings
1488 }
1489
1490 pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
1491 self.bindings
1492 .get(provider)
1493 .map(Vec::as_slice)
1494 .unwrap_or(&[])
1495 }
1496}
1497
1498#[derive(Clone, Debug, PartialEq, Eq)]
1500pub struct ActivationHandle {
1501 pub provider: ProviderId,
1502 pub binding_count: usize,
1503}
1504
1505impl ActivationHandle {
1506 pub fn new(provider: ProviderId, binding_count: usize) -> Self {
1507 Self {
1508 provider,
1509 binding_count,
1510 }
1511 }
1512}
1513
1514#[derive(Clone, Debug, PartialEq)]
1516pub struct RawInbound {
1517 pub kind: String,
1518 pub headers: BTreeMap<String, String>,
1519 pub query: BTreeMap<String, String>,
1520 pub body: Vec<u8>,
1521 pub received_at: OffsetDateTime,
1522 pub occurred_at: Option<OffsetDateTime>,
1523 pub tenant_id: Option<TenantId>,
1524 pub metadata: JsonValue,
1525}
1526
1527impl RawInbound {
1528 pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
1529 Self {
1530 kind: kind.into(),
1531 headers,
1532 query: BTreeMap::new(),
1533 body,
1534 received_at: clock::now_utc(),
1535 occurred_at: None,
1536 tenant_id: None,
1537 metadata: JsonValue::Null,
1538 }
1539 }
1540
1541 pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
1542 Ok(serde_json::from_slice(&self.body)?)
1543 }
1544}
1545
1546#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1548pub struct RateLimitConfig {
1549 pub capacity: u32,
1550 pub refill_tokens: u32,
1551 pub refill_interval: StdDuration,
1552}
1553
1554impl Default for RateLimitConfig {
1555 fn default() -> Self {
1556 Self {
1557 capacity: 60,
1558 refill_tokens: 1,
1559 refill_interval: StdDuration::from_secs(1),
1560 }
1561 }
1562}
1563
1564#[derive(Clone, Debug)]
1565struct TokenBucket {
1566 tokens: f64,
1567 last_refill: ClockInstant,
1568}
1569
1570impl TokenBucket {
1571 fn full(config: RateLimitConfig) -> Self {
1572 Self {
1573 tokens: config.capacity as f64,
1574 last_refill: clock::instant_now(),
1575 }
1576 }
1577
1578 fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
1579 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1580 let rate = config.refill_tokens.max(1) as f64 / interval;
1581 let elapsed = now.duration_since(self.last_refill).as_secs_f64();
1582 self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
1583 self.last_refill = now;
1584 }
1585
1586 fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
1587 self.refill(config, now);
1588 if self.tokens >= 1.0 {
1589 self.tokens -= 1.0;
1590 true
1591 } else {
1592 false
1593 }
1594 }
1595
1596 fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
1597 if self.tokens >= 1.0 {
1598 return StdDuration::ZERO;
1599 }
1600 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1601 let rate = config.refill_tokens.max(1) as f64 / interval;
1602 let missing = (1.0 - self.tokens).max(0.0);
1603 StdDuration::from_secs_f64((missing / rate).max(0.001))
1604 }
1605}
1606
1607#[derive(Debug)]
1609pub struct RateLimiterFactory {
1610 config: RateLimitConfig,
1611 buckets: Mutex<HashMap<(String, String), TokenBucket>>,
1612}
1613
1614impl RateLimiterFactory {
1615 pub fn new(config: RateLimitConfig) -> Self {
1616 Self {
1617 config,
1618 buckets: Mutex::new(HashMap::new()),
1619 }
1620 }
1621
1622 pub fn config(&self) -> RateLimitConfig {
1623 self.config
1624 }
1625
1626 pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
1627 ScopedRateLimiter {
1628 factory: self,
1629 provider: provider.clone(),
1630 key: key.into(),
1631 }
1632 }
1633
1634 pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
1635 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1636 let bucket = buckets
1637 .entry((provider.as_str().to_string(), key.to_string()))
1638 .or_insert_with(|| TokenBucket::full(self.config));
1639 bucket.try_acquire(self.config, clock::instant_now())
1640 }
1641
1642 pub async fn acquire(&self, provider: &ProviderId, key: &str) {
1643 loop {
1644 let wait = {
1645 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1646 let bucket = buckets
1647 .entry((provider.as_str().to_string(), key.to_string()))
1648 .or_insert_with(|| TokenBucket::full(self.config));
1649 if bucket.try_acquire(self.config, clock::instant_now()) {
1650 return;
1651 }
1652 bucket.wait_duration(self.config)
1653 };
1654 tokio::time::sleep(wait).await;
1655 }
1656 }
1657}
1658
1659impl Default for RateLimiterFactory {
1660 fn default() -> Self {
1661 Self::new(RateLimitConfig::default())
1662 }
1663}
1664
1665#[derive(Clone, Debug)]
1667pub struct ScopedRateLimiter<'a> {
1668 factory: &'a RateLimiterFactory,
1669 provider: ProviderId,
1670 key: String,
1671}
1672
1673impl<'a> ScopedRateLimiter<'a> {
1674 pub fn try_acquire(&self) -> bool {
1675 self.factory.try_acquire(&self.provider, &self.key)
1676 }
1677
1678 pub async fn acquire(&self) {
1679 self.factory.acquire(&self.provider, &self.key).await;
1680 }
1681}
1682
1683pub struct ConnectorRegistry {
1685 connectors: BTreeMap<ProviderId, ConnectorHandle>,
1686}
1687
1688impl ConnectorRegistry {
1689 pub fn empty() -> Self {
1690 Self {
1691 connectors: BTreeMap::new(),
1692 }
1693 }
1694
1695 pub fn with_defaults() -> Self {
1696 let mut registry = Self::empty();
1697 for provider in registered_provider_metadata() {
1698 registry
1699 .register(default_connector_for_provider(&provider))
1700 .expect("default connector registration should not fail");
1701 }
1702 registry
1703 }
1704
1705 pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
1706 let provider = connector.provider_id().clone();
1707 if self.connectors.contains_key(&provider) {
1708 return Err(ConnectorError::DuplicateProvider(provider.0));
1709 }
1710 self.connectors
1711 .insert(provider, Arc::new(AsyncMutex::new(connector)));
1712 Ok(())
1713 }
1714
1715 pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
1716 self.connectors.get(id).cloned()
1717 }
1718
1719 pub fn remove(&mut self, id: &ProviderId) -> Option<ConnectorHandle> {
1720 self.connectors.remove(id)
1721 }
1722
1723 pub fn list(&self) -> Vec<ProviderId> {
1724 self.connectors.keys().cloned().collect()
1725 }
1726
1727 pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1728 for connector in self.connectors.values() {
1729 connector.lock().await.init(ctx.clone()).await?;
1730 }
1731 Ok(())
1732 }
1733
1734 pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
1735 let mut clients = BTreeMap::new();
1736 for (provider, connector) in &self.connectors {
1737 let client = connector.lock().await.client();
1738 clients.insert(provider.clone(), client);
1739 }
1740 clients
1741 }
1742
1743 pub async fn activate_all(
1744 &self,
1745 registry: &TriggerRegistry,
1746 ) -> Result<Vec<ActivationHandle>, ConnectorError> {
1747 let mut handles = Vec::new();
1748 for (provider, connector) in &self.connectors {
1749 let bindings = registry.bindings_for(provider);
1750 if bindings.is_empty() {
1751 continue;
1752 }
1753 let connector = connector.lock().await;
1754 handles.push(connector.activate(bindings).await?);
1755 }
1756 Ok(handles)
1757 }
1758}
1759
1760impl Default for ConnectorRegistry {
1761 fn default() -> Self {
1762 Self::with_defaults()
1763 }
1764}
1765
1766fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
1767 if provider.provider == "github" {
1777 return Box::new(GitHubConnector::new());
1778 }
1779 if provider.provider == "linear" {
1780 return Box::new(LinearConnector::new());
1781 }
1782 if provider.provider == "slack" {
1783 return Box::new(SlackConnector::new());
1784 }
1785 if provider.provider == "notion" {
1786 return Box::new(NotionConnector::new());
1787 }
1788 if provider.provider == "a2a-push" {
1789 return Box::new(A2aPushConnector::new());
1790 }
1791 match &provider.runtime {
1792 ProviderRuntimeMetadata::Builtin {
1793 connector,
1794 default_signature_variant,
1795 } => match connector.as_str() {
1796 "cron" => Box::new(CronConnector::new()),
1797 "stream" => Box::new(StreamConnector::new(
1798 ProviderId::from(provider.provider.clone()),
1799 provider.schema_name.clone(),
1800 )),
1801 "webhook" => {
1802 let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
1803 .expect("catalog webhook signature variant must be valid");
1804 Box::new(GenericWebhookConnector::with_profile(
1805 WebhookProviderProfile::new(
1806 ProviderId::from(provider.provider.clone()),
1807 provider.schema_name.clone(),
1808 variant,
1809 ),
1810 ))
1811 }
1812 _ => Box::new(PlaceholderConnector::from_metadata(provider)),
1813 },
1814 ProviderRuntimeMetadata::Placeholder => {
1815 Box::new(PlaceholderConnector::from_metadata(provider))
1816 }
1817 }
1818}
1819
1820struct PlaceholderConnector {
1821 provider_id: ProviderId,
1822 kinds: Vec<TriggerKind>,
1823 schema_name: String,
1824}
1825
1826impl PlaceholderConnector {
1827 fn from_metadata(metadata: &ProviderMetadata) -> Self {
1828 Self {
1829 provider_id: ProviderId::from(metadata.provider.clone()),
1830 kinds: metadata
1831 .kinds
1832 .iter()
1833 .cloned()
1834 .map(TriggerKind::from)
1835 .collect(),
1836 schema_name: metadata.schema_name.clone(),
1837 }
1838 }
1839}
1840
1841struct PlaceholderClient;
1842
1843#[async_trait]
1844impl ConnectorClient for PlaceholderClient {
1845 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1846 Err(ClientError::Other(format!(
1847 "connector client method '{method}' is not implemented for this provider"
1848 )))
1849 }
1850}
1851
1852#[async_trait]
1853impl Connector for PlaceholderConnector {
1854 fn provider_id(&self) -> &ProviderId {
1855 &self.provider_id
1856 }
1857
1858 fn kinds(&self) -> &[TriggerKind] {
1859 &self.kinds
1860 }
1861
1862 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1863 Ok(())
1864 }
1865
1866 async fn activate(
1867 &self,
1868 bindings: &[TriggerBinding],
1869 ) -> Result<ActivationHandle, ConnectorError> {
1870 Ok(ActivationHandle::new(
1871 self.provider_id.clone(),
1872 bindings.len(),
1873 ))
1874 }
1875
1876 async fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
1877 Err(ConnectorError::Unsupported(format!(
1878 "provider '{}' is cataloged but does not have a concrete inbound connector yet",
1879 self.provider_id.as_str()
1880 )))
1881 }
1882
1883 fn payload_schema(&self) -> ProviderPayloadSchema {
1884 ProviderPayloadSchema::named(self.schema_name.clone())
1885 }
1886
1887 fn client(&self) -> Arc<dyn ConnectorClient> {
1888 Arc::new(PlaceholderClient)
1889 }
1890}
1891
1892pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
1893 ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
1894 *slot.borrow_mut() = clients
1895 .into_iter()
1896 .map(|(provider, client)| (provider.as_str().to_string(), client))
1897 .collect();
1898 });
1899}
1900
1901pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
1902 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
1903}
1904
1905pub fn clear_active_connector_clients() {
1906 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
1907}
1908
1909#[cfg(test)]
1910mod tests {
1911 use super::*;
1912
1913 use std::sync::atomic::{AtomicUsize, Ordering};
1914
1915 use async_trait::async_trait;
1916 use serde_json::json;
1917
1918 struct NoopClient;
1919
1920 #[async_trait]
1921 impl ConnectorClient for NoopClient {
1922 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1923 Ok(json!({ "method": method }))
1924 }
1925 }
1926
1927 struct FakeConnector {
1928 provider_id: ProviderId,
1929 kinds: Vec<TriggerKind>,
1930 activate_calls: Arc<AtomicUsize>,
1931 }
1932
1933 impl FakeConnector {
1934 fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
1935 Self {
1936 provider_id: ProviderId::from(provider_id),
1937 kinds: vec![TriggerKind::from("webhook")],
1938 activate_calls,
1939 }
1940 }
1941 }
1942
1943 #[async_trait]
1944 impl Connector for FakeConnector {
1945 fn provider_id(&self) -> &ProviderId {
1946 &self.provider_id
1947 }
1948
1949 fn kinds(&self) -> &[TriggerKind] {
1950 &self.kinds
1951 }
1952
1953 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1954 Ok(())
1955 }
1956
1957 async fn activate(
1958 &self,
1959 bindings: &[TriggerBinding],
1960 ) -> Result<ActivationHandle, ConnectorError> {
1961 self.activate_calls.fetch_add(1, Ordering::SeqCst);
1962 Ok(ActivationHandle::new(
1963 self.provider_id.clone(),
1964 bindings.len(),
1965 ))
1966 }
1967
1968 async fn normalize_inbound(
1969 &self,
1970 _raw: RawInbound,
1971 ) -> Result<TriggerEvent, ConnectorError> {
1972 Err(ConnectorError::Unsupported(
1973 "not needed for registry tests".to_string(),
1974 ))
1975 }
1976
1977 fn payload_schema(&self) -> ProviderPayloadSchema {
1978 ProviderPayloadSchema::named("FakePayload")
1979 }
1980
1981 fn client(&self) -> Arc<dyn ConnectorClient> {
1982 Arc::new(NoopClient)
1983 }
1984 }
1985
1986 #[tokio::test]
1987 async fn connector_registry_rejects_duplicate_providers() {
1988 let activate_calls = Arc::new(AtomicUsize::new(0));
1989 let mut registry = ConnectorRegistry::empty();
1990 registry
1991 .register(Box::new(FakeConnector::new(
1992 "github",
1993 activate_calls.clone(),
1994 )))
1995 .unwrap();
1996
1997 let error = registry
1998 .register(Box::new(FakeConnector::new("github", activate_calls)))
1999 .unwrap_err();
2000 assert!(matches!(
2001 error,
2002 ConnectorError::DuplicateProvider(provider) if provider == "github"
2003 ));
2004 }
2005
2006 #[tokio::test]
2007 async fn connector_registry_activates_only_bound_connectors() {
2008 let github_calls = Arc::new(AtomicUsize::new(0));
2009 let slack_calls = Arc::new(AtomicUsize::new(0));
2010 let mut registry = ConnectorRegistry::empty();
2011 registry
2012 .register(Box::new(FakeConnector::new("github", github_calls.clone())))
2013 .unwrap();
2014 registry
2015 .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
2016 .unwrap();
2017
2018 let mut trigger_registry = TriggerRegistry::default();
2019 trigger_registry.register(TriggerBinding::new(
2020 ProviderId::from("github"),
2021 "webhook",
2022 "github.push",
2023 ));
2024 trigger_registry.register(TriggerBinding::new(
2025 ProviderId::from("github"),
2026 "webhook",
2027 "github.installation",
2028 ));
2029
2030 let handles = registry.activate_all(&trigger_registry).await.unwrap();
2031 assert_eq!(handles.len(), 1);
2032 assert_eq!(handles[0].provider.as_str(), "github");
2033 assert_eq!(handles[0].binding_count, 2);
2034 assert_eq!(github_calls.load(Ordering::SeqCst), 1);
2035 assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
2036 }
2037
2038 #[test]
2039 fn rate_limiter_scopes_tokens_by_provider_and_key() {
2040 let factory = RateLimiterFactory::new(RateLimitConfig {
2041 capacity: 1,
2042 refill_tokens: 1,
2043 refill_interval: StdDuration::from_secs(60),
2044 });
2045
2046 assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
2047 assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
2048 assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
2049 assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
2050 }
2051
2052 #[test]
2053 fn raw_inbound_json_body_preserves_raw_bytes() {
2054 let raw = RawInbound::new(
2055 "push",
2056 BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
2057 br#"{"ok":true}"#.to_vec(),
2058 );
2059
2060 assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
2061 }
2062
2063 #[test]
2064 fn connector_registry_lists_catalog_providers() {
2065 let registry = ConnectorRegistry::default();
2066 let providers = registry.list();
2067 assert!(providers.contains(&ProviderId::from("cron")));
2068 assert!(providers.contains(&ProviderId::from("github")));
2069 assert!(providers.contains(&ProviderId::from("webhook")));
2070 }
2071
2072 #[test]
2073 fn pure_harn_pivot_only_keeps_core_or_compat_builtin_connectors() {
2074 let core_runtime_providers = [
2075 "a2a-push",
2076 "cron",
2077 "email",
2078 "kafka",
2079 "nats",
2080 "postgres-cdc",
2081 "pulsar",
2082 "webhook",
2083 "websocket",
2084 ];
2085
2086 for provider in registered_provider_metadata() {
2087 if !matches!(provider.runtime, ProviderRuntimeMetadata::Builtin { .. }) {
2088 continue;
2089 }
2090
2091 let allowed_core = core_runtime_providers.contains(&provider.provider.as_str());
2092 let allowed_compat = is_rust_provider_connector_compat_provider(&provider.provider);
2093 assert!(
2094 allowed_core || allowed_compat,
2095 "provider '{}' is registered as a Rust builtin connector; new service connectors \
2096 must ship as pure-Harn packages and register with connector = {{ harn = \"...\" }}",
2097 provider.provider
2098 );
2099 }
2100 }
2101
2102 #[test]
2103 fn metrics_registry_exports_orchestrator_metric_families() {
2104 let metrics = MetricsRegistry::default();
2105 metrics.record_http_request(
2106 "/triggers/github",
2107 "POST",
2108 200,
2109 StdDuration::from_millis(25),
2110 512,
2111 );
2112 metrics.record_trigger_received("github-new-issue", "github");
2113 metrics.record_trigger_deduped("github-new-issue", "inbox_duplicate");
2114 metrics.record_trigger_predicate_evaluation("github-new-issue", true, 0.002);
2115 metrics.record_trigger_dispatched("github-new-issue", "local", "succeeded");
2116 metrics.record_trigger_retry("github-new-issue", 2);
2117 metrics.record_trigger_dlq("github-new-issue", "retry_exhausted");
2118 metrics.set_trigger_inflight("github-new-issue", 0);
2119 metrics.record_event_log_append(
2120 "orchestrator.triggers.pending",
2121 StdDuration::from_millis(1),
2122 2048,
2123 );
2124 metrics.set_event_log_consumer_lag("orchestrator.triggers.pending", "orchestrator-pump", 0);
2125 metrics.set_trigger_budget_cost_today("github-new-issue", 0.002);
2126 metrics.record_trigger_budget_exhausted("github-new-issue", "daily_budget_exceeded");
2127 metrics.record_a2a_hop("agent.example", "succeeded", StdDuration::from_millis(10));
2128 metrics.set_worker_queue_depth("triage", 1);
2129 metrics.record_worker_queue_claim_age("triage", 3.0);
2130 metrics.set_orchestrator_pump_backlog("trigger.inbox.envelopes", 2);
2131 metrics.set_orchestrator_pump_outstanding("trigger.inbox.envelopes", 1);
2132 metrics.record_orchestrator_pump_admission_delay(
2133 "trigger.inbox.envelopes",
2134 StdDuration::from_millis(50),
2135 );
2136 metrics.record_trigger_accepted_to_normalized(
2137 "github-new-issue",
2138 "github-new-issue@v7",
2139 "github",
2140 Some("tenant-a"),
2141 "normalized",
2142 StdDuration::from_millis(25),
2143 );
2144 metrics.record_trigger_accepted_to_queue_append(
2145 "github-new-issue",
2146 "github-new-issue@v7",
2147 "github",
2148 Some("tenant-a"),
2149 "queued",
2150 StdDuration::from_millis(40),
2151 );
2152 metrics.record_trigger_queue_age_at_dispatch_admission(
2153 "github-new-issue",
2154 "github-new-issue@v7",
2155 "github",
2156 Some("tenant-a"),
2157 "admitted",
2158 StdDuration::from_millis(75),
2159 );
2160 metrics.record_trigger_queue_age_at_dispatch_start(
2161 "github-new-issue",
2162 "github-new-issue@v7",
2163 "github",
2164 Some("tenant-a"),
2165 "started",
2166 StdDuration::from_millis(125),
2167 );
2168 metrics.record_trigger_dispatch_runtime(
2169 "github-new-issue",
2170 "github-new-issue@v7",
2171 "github",
2172 Some("tenant-a"),
2173 "succeeded",
2174 StdDuration::from_millis(250),
2175 );
2176 metrics.record_trigger_retry_delay(
2177 "github-new-issue",
2178 "github-new-issue@v7",
2179 "github",
2180 Some("tenant-a"),
2181 "scheduled",
2182 StdDuration::from_secs(2),
2183 );
2184 metrics.record_trigger_accepted_to_dlq(
2185 "github-new-issue",
2186 "github-new-issue@v7",
2187 "github",
2188 Some("tenant-a"),
2189 "retry_exhausted",
2190 StdDuration::from_secs(45),
2191 );
2192 metrics.record_backpressure_event("ingest", "reject");
2193 metrics.note_trigger_pending_event(
2194 "evt-1",
2195 "github-new-issue",
2196 "github-new-issue@v7",
2197 "github",
2198 Some("tenant-a"),
2199 1_000,
2200 4_000,
2201 );
2202 metrics.record_llm_call("mock", "mock", "succeeded", 0.01);
2203 metrics.record_llm_cache_hit("mock");
2204
2205 let rendered = metrics.render_prometheus();
2206 for needle in [
2207 "harn_http_requests_total{endpoint=\"/triggers/github\",method=\"POST\",status=\"200\"} 1",
2208 "harn_http_request_duration_seconds_bucket{endpoint=\"/triggers/github\",le=\"0.05\"} 1",
2209 "harn_http_body_size_bytes_bucket{endpoint=\"/triggers/github\",le=\"512\"} 1",
2210 "harn_trigger_received_total{provider=\"github\",trigger_id=\"github-new-issue\"} 1",
2211 "harn_trigger_deduped_total{reason=\"inbox_duplicate\",trigger_id=\"github-new-issue\"} 1",
2212 "harn_trigger_predicate_evaluations_total{result=\"true\",trigger_id=\"github-new-issue\"} 1",
2213 "harn_trigger_predicate_cost_usd_bucket{le=\"0.01\",trigger_id=\"github-new-issue\"} 1",
2214 "harn_trigger_dispatched_total{handler_kind=\"local\",outcome=\"succeeded\",trigger_id=\"github-new-issue\"} 1",
2215 "harn_trigger_retries_total{attempt=\"2\",trigger_id=\"github-new-issue\"} 1",
2216 "harn_trigger_dlq_total{reason=\"retry_exhausted\",trigger_id=\"github-new-issue\"} 1",
2217 "harn_trigger_inflight{trigger_id=\"github-new-issue\"} 0",
2218 "harn_event_log_append_duration_seconds_bucket{le=\"0.005\",topic=\"orchestrator.triggers.pending\"} 1",
2219 "harn_event_log_topic_size_bytes{topic=\"orchestrator.triggers.pending\"} 2048",
2220 "harn_event_log_consumer_lag{consumer=\"orchestrator-pump\",topic=\"orchestrator.triggers.pending\"} 0",
2221 "harn_trigger_budget_cost_today_usd{trigger_id=\"github-new-issue\"} 0.002",
2222 "harn_trigger_budget_exhausted_total{strategy=\"daily_budget_exceeded\",trigger_id=\"github-new-issue\"} 1",
2223 "harn_backpressure_events_total{action=\"reject\",dimension=\"ingest\"} 1",
2224 "harn_a2a_hops_total{outcome=\"succeeded\",target=\"agent.example\"} 1",
2225 "harn_a2a_hop_duration_seconds_bucket{le=\"0.01\",target=\"agent.example\"} 1",
2226 "harn_worker_queue_depth{queue=\"triage\"} 1",
2227 "harn_worker_queue_claim_age_seconds_bucket{le=\"5\",queue=\"triage\"} 1",
2228 "harn_orchestrator_pump_backlog{topic=\"trigger.inbox.envelopes\"} 2",
2229 "harn_orchestrator_pump_outstanding{topic=\"trigger.inbox.envelopes\"} 1",
2230 "harn_orchestrator_pump_admission_delay_seconds_bucket{le=\"0.05\",topic=\"trigger.inbox.envelopes\"} 1",
2231 "harn_trigger_webhook_accepted_to_normalized_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.025\",provider=\"github\",status=\"normalized\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2232 "harn_trigger_webhook_accepted_to_queue_append_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.05\",provider=\"github\",status=\"queued\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2233 "harn_trigger_queue_age_at_dispatch_admission_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.1\",provider=\"github\",status=\"admitted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2234 "harn_trigger_queue_age_at_dispatch_start_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"started\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2235 "harn_trigger_dispatch_runtime_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"succeeded\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2236 "harn_trigger_retry_delay_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"2.5\",provider=\"github\",status=\"scheduled\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2237 "harn_trigger_accepted_to_dlq_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"60\",provider=\"github\",status=\"retry_exhausted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2238 "harn_trigger_oldest_pending_age_seconds{binding_key=\"github-new-issue@v7\",provider=\"github\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 3",
2239 "harn_llm_calls_total{model=\"mock\",outcome=\"succeeded\",provider=\"mock\"} 1",
2240 "harn_llm_cost_usd_total{model=\"mock\",provider=\"mock\"} 0.01",
2241 "harn_llm_cache_hits_total{provider=\"mock\"} 1",
2242 ] {
2243 assert!(rendered.contains(needle), "missing {needle}\n{rendered}");
2244 }
2245 }
2246}