1use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex, OnceLock};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25 registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26 ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod a2a_push;
30pub mod cron;
31pub mod effect_policy;
32pub mod harn_module;
33pub mod hmac;
34pub mod shared;
35pub mod stream;
36#[cfg(test)]
37pub(crate) mod test_util;
38pub mod testkit;
39pub mod webhook;
40
41pub use a2a_push::A2aPushConnector;
42pub use cron::{CatchupMode, CronConnector};
43pub use effect_policy::{
44 connector_export_denied_builtin_reason, connector_export_effect_class,
45 default_connector_export_policy, ConnectorExportEffectClass, HarnConnectorEffectPolicies,
46};
47pub use harn_module::{
48 load_contract as load_harn_connector_contract, HarnConnector, HarnConnectorContract,
49};
50pub use hmac::{
51 verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
52 DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
53 DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
54 DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
55 DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
56 DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
57 SIGNATURE_VERIFY_AUDIT_TOPIC,
58};
59pub use shared::{
60 paginate_cursor, resolve_jwks, verify_hmac_signature, verify_jwt_claims, verify_jwt_json,
61 ConnectorBase, CursorPage, HmacSignatureAlgorithm, JwtKeySource, JwtVerificationOptions,
62};
63pub use stream::StreamConnector;
64use webhook::WebhookProviderProfile;
65pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
66
67const OUTBOUND_CONNECTOR_HTTP_TIMEOUT: StdDuration = StdDuration::from_secs(30);
68
69pub(crate) fn outbound_http_client(user_agent: &'static str) -> reqwest::Client {
70 reqwest::Client::builder()
71 .user_agent(user_agent)
72 .timeout(OUTBOUND_CONNECTOR_HTTP_TIMEOUT)
73 .redirect(crate::egress::redirect_policy("connector_redirect", 10))
74 .build()
75 .expect("connector HTTP client configuration should be valid")
76}
77
78pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
80
81thread_local! {
82 static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
83 RefCell::new(BTreeMap::new());
84}
85
86#[async_trait]
88pub trait Connector: Send + Sync {
89 fn provider_id(&self) -> &ProviderId;
91
92 fn kinds(&self) -> &[TriggerKind];
94
95 async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
97
98 async fn activate(
100 &self,
101 bindings: &[TriggerBinding],
102 ) -> Result<ActivationHandle, ConnectorError>;
103
104 async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
106 Ok(())
107 }
108
109 async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
111
112 async fn normalize_inbound_result(
115 &self,
116 raw: RawInbound,
117 ) -> Result<ConnectorNormalizeResult, ConnectorError> {
118 self.normalize_inbound(raw)
119 .await
120 .map(ConnectorNormalizeResult::event)
121 }
122
123 fn payload_schema(&self) -> ProviderPayloadSchema;
125
126 fn client(&self) -> Arc<dyn ConnectorClient>;
128}
129
130#[derive(Clone, Debug, PartialEq, Eq)]
132pub struct ConnectorHttpResponse {
133 pub status: u16,
134 pub headers: BTreeMap<String, String>,
135 pub body: JsonValue,
136}
137
138impl ConnectorHttpResponse {
139 pub fn new(status: u16, headers: BTreeMap<String, String>, body: JsonValue) -> Self {
140 Self {
141 status,
142 headers,
143 body,
144 }
145 }
146}
147
148#[derive(Clone, Debug, PartialEq)]
150pub enum ConnectorNormalizeResult {
151 Event(Box<TriggerEvent>),
152 Batch(Vec<TriggerEvent>),
153 ImmediateResponse {
154 response: ConnectorHttpResponse,
155 events: Vec<TriggerEvent>,
156 },
157 Reject(ConnectorHttpResponse),
158}
159
160impl ConnectorNormalizeResult {
161 pub fn event(event: TriggerEvent) -> Self {
162 Self::Event(Box::new(event))
163 }
164
165 pub fn into_events(self) -> Vec<TriggerEvent> {
166 match self {
167 Self::Event(event) => vec![*event],
168 Self::Batch(events) | Self::ImmediateResponse { events, .. } => events,
169 Self::Reject(_) => Vec::new(),
170 }
171 }
172}
173
174#[derive(Clone, Debug, PartialEq)]
175pub enum PostNormalizeOutcome {
176 Ready(Box<TriggerEvent>),
177 DuplicateDropped,
178}
179
180pub async fn postprocess_normalized_event(
181 inbox: &InboxIndex,
182 binding_id: &str,
183 dedupe_enabled: bool,
184 dedupe_ttl: StdDuration,
185 mut event: TriggerEvent,
186) -> Result<PostNormalizeOutcome, ConnectorError> {
187 if dedupe_enabled && !event.dedupe_claimed() {
188 if !inbox
189 .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
190 .await?
191 {
192 return Ok(PostNormalizeOutcome::DuplicateDropped);
193 }
194 event.mark_dedupe_claimed();
195 }
196
197 Ok(PostNormalizeOutcome::Ready(Box::new(event)))
198}
199
200#[async_trait]
202pub trait ConnectorClient: Send + Sync {
203 async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
204}
205
206#[derive(Clone, Debug, PartialEq, Eq)]
208pub enum ClientError {
209 MethodNotFound(String),
210 InvalidArgs(String),
211 RateLimited(String),
212 Transport(String),
213 EgressBlocked(crate::egress::EgressBlocked),
214 Other(String),
215}
216
217impl fmt::Display for ClientError {
218 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219 match self {
220 Self::MethodNotFound(message)
221 | Self::InvalidArgs(message)
222 | Self::RateLimited(message)
223 | Self::Transport(message)
224 | Self::Other(message) => message.fmt(f),
225 Self::EgressBlocked(blocked) => blocked.fmt(f),
226 }
227 }
228}
229
230impl std::error::Error for ClientError {}
231
232#[derive(Debug)]
234pub enum ConnectorError {
235 DuplicateProvider(String),
236 DuplicateDelivery(String),
237 UnknownProvider(String),
238 MissingHeader(String),
239 InvalidHeader {
240 name: String,
241 detail: String,
242 },
243 InvalidSignature(String),
244 TimestampOutOfWindow {
245 timestamp: OffsetDateTime,
246 now: OffsetDateTime,
247 window: time::Duration,
248 },
249 Json(String),
250 Secret(String),
251 EventLog(String),
252 HarnRuntime(String),
253 Client(ClientError),
254 Unsupported(String),
255 Activation(String),
256}
257
258impl ConnectorError {
259 pub fn invalid_signature(message: impl Into<String>) -> Self {
260 Self::InvalidSignature(message.into())
261 }
262}
263
264impl fmt::Display for ConnectorError {
265 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
266 match self {
267 Self::DuplicateProvider(provider) => {
268 write!(f, "connector provider `{provider}` is already registered")
269 }
270 Self::DuplicateDelivery(message) => message.fmt(f),
271 Self::UnknownProvider(provider) => {
272 write!(f, "connector provider `{provider}` is not registered")
273 }
274 Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
275 Self::InvalidHeader { name, detail } => {
276 write!(f, "invalid header `{name}`: {detail}")
277 }
278 Self::InvalidSignature(message)
279 | Self::Json(message)
280 | Self::Secret(message)
281 | Self::EventLog(message)
282 | Self::HarnRuntime(message)
283 | Self::Unsupported(message)
284 | Self::Activation(message) => message.fmt(f),
285 Self::TimestampOutOfWindow {
286 timestamp,
287 now,
288 window,
289 } => write!(
290 f,
291 "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
292 ),
293 Self::Client(error) => error.fmt(f),
294 }
295 }
296}
297
298impl std::error::Error for ConnectorError {}
299
300impl From<crate::event_log::LogError> for ConnectorError {
301 fn from(value: crate::event_log::LogError) -> Self {
302 Self::EventLog(value.to_string())
303 }
304}
305
306impl From<crate::secrets::SecretError> for ConnectorError {
307 fn from(value: crate::secrets::SecretError) -> Self {
308 Self::Secret(value.to_string())
309 }
310}
311
312impl From<serde_json::Error> for ConnectorError {
313 fn from(value: serde_json::Error) -> Self {
314 Self::Json(value.to_string())
315 }
316}
317
318impl From<ClientError> for ConnectorError {
319 fn from(value: ClientError) -> Self {
320 Self::Client(value)
321 }
322}
323
324#[derive(Clone)]
326pub struct ConnectorCtx {
327 pub event_log: Arc<AnyEventLog>,
328 pub secrets: Arc<dyn SecretProvider>,
329 pub inbox: Arc<InboxIndex>,
330 pub metrics: Arc<MetricsRegistry>,
331 pub rate_limiter: Arc<RateLimiterFactory>,
332}
333
334#[derive(Clone, Debug, Default, PartialEq, Eq)]
336pub struct ConnectorMetricsSnapshot {
337 pub inbox_claims_written: u64,
338 pub inbox_duplicates_rejected: u64,
339 pub inbox_fast_path_hits: u64,
340 pub inbox_durable_hits: u64,
341 pub inbox_expired_entries: u64,
342 pub inbox_active_entries: u64,
343 pub linear_timestamp_rejections_total: u64,
344 pub dispatch_succeeded_total: u64,
345 pub dispatch_failed_total: u64,
346 pub retry_scheduled_total: u64,
347 pub slack_delivery_success_total: u64,
348 pub slack_delivery_failure_total: u64,
349}
350
351type MetricLabels = BTreeMap<String, String>;
352
353#[derive(Clone, Debug, Default, PartialEq)]
354struct HistogramMetric {
355 buckets: BTreeMap<String, u64>,
356 count: u64,
357 sum: f64,
358}
359
360static ACTIVE_METRICS_REGISTRY: OnceLock<Mutex<Option<Arc<MetricsRegistry>>>> = OnceLock::new();
361
362pub fn install_active_metrics_registry(metrics: Arc<MetricsRegistry>) {
363 let slot = ACTIVE_METRICS_REGISTRY.get_or_init(|| Mutex::new(None));
364 *slot.lock().expect("active metrics registry poisoned") = Some(metrics);
365}
366
367pub fn clear_active_metrics_registry() {
368 if let Some(slot) = ACTIVE_METRICS_REGISTRY.get() {
369 *slot.lock().expect("active metrics registry poisoned") = None;
370 }
371}
372
373pub fn active_metrics_registry() -> Option<Arc<MetricsRegistry>> {
374 ACTIVE_METRICS_REGISTRY.get().and_then(|slot| {
375 slot.lock()
376 .expect("active metrics registry poisoned")
377 .clone()
378 })
379}
380
381#[derive(Debug, Default)]
383pub struct MetricsRegistry {
384 inbox_claims_written: AtomicU64,
385 inbox_duplicates_rejected: AtomicU64,
386 inbox_fast_path_hits: AtomicU64,
387 inbox_durable_hits: AtomicU64,
388 inbox_expired_entries: AtomicU64,
389 inbox_active_entries: AtomicU64,
390 linear_timestamp_rejections_total: AtomicU64,
391 dispatch_succeeded_total: AtomicU64,
392 dispatch_failed_total: AtomicU64,
393 retry_scheduled_total: AtomicU64,
394 slack_delivery_success_total: AtomicU64,
395 slack_delivery_failure_total: AtomicU64,
396 custom_counters: Mutex<BTreeMap<String, u64>>,
397 counters: Mutex<BTreeMap<(String, MetricLabels), f64>>,
398 gauges: Mutex<BTreeMap<(String, MetricLabels), f64>>,
399 histograms: Mutex<BTreeMap<(String, MetricLabels), HistogramMetric>>,
400 pending_trigger_events: Mutex<BTreeMap<MetricLabels, BTreeMap<String, i64>>>,
401}
402
403impl MetricsRegistry {
404 const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0];
405 const TRIGGER_LATENCY_BUCKETS: [f64; 15] = [
406 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0,
407 ];
408 const SIZE_BUCKETS: [f64; 9] = [
409 128.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0, 10485760.0,
410 ];
411
412 pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
413 ConnectorMetricsSnapshot {
414 inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
415 inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
416 inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
417 inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
418 inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
419 inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
420 linear_timestamp_rejections_total: self
421 .linear_timestamp_rejections_total
422 .load(Ordering::Relaxed),
423 dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
424 dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
425 retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
426 slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
427 slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
428 }
429 }
430
431 pub(crate) fn record_inbox_claim(&self) {
432 self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
433 }
434
435 pub(crate) fn record_inbox_duplicate_fast_path(&self) {
436 self.inbox_duplicates_rejected
437 .fetch_add(1, Ordering::Relaxed);
438 self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
439 }
440
441 pub(crate) fn record_inbox_duplicate_durable(&self) {
442 self.inbox_duplicates_rejected
443 .fetch_add(1, Ordering::Relaxed);
444 self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
445 }
446
447 pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
448 if count > 0 {
449 self.inbox_expired_entries
450 .fetch_add(count, Ordering::Relaxed);
451 }
452 }
453
454 pub(crate) fn set_inbox_active_entries(&self, count: usize) {
455 self.inbox_active_entries
456 .store(count as u64, Ordering::Relaxed);
457 }
458
459 pub fn record_linear_timestamp_rejection(&self) {
460 self.linear_timestamp_rejections_total
461 .fetch_add(1, Ordering::Relaxed);
462 }
463
464 pub fn record_dispatch_succeeded(&self) {
465 self.dispatch_succeeded_total
466 .fetch_add(1, Ordering::Relaxed);
467 }
468
469 pub fn record_dispatch_failed(&self) {
470 self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
471 }
472
473 pub fn record_retry_scheduled(&self) {
474 self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
475 }
476
477 pub fn record_slack_delivery_success(&self) {
478 self.slack_delivery_success_total
479 .fetch_add(1, Ordering::Relaxed);
480 }
481
482 pub fn record_slack_delivery_failure(&self) {
483 self.slack_delivery_failure_total
484 .fetch_add(1, Ordering::Relaxed);
485 }
486
487 pub fn record_custom_counter(&self, name: &str, amount: u64) {
488 if amount == 0 {
489 return;
490 }
491 let mut counters = self
492 .custom_counters
493 .lock()
494 .expect("custom counters poisoned");
495 *counters.entry(name.to_string()).or_default() += amount;
496 }
497
498 pub fn record_http_request(
499 &self,
500 endpoint: &str,
501 method: &str,
502 status: u16,
503 duration: StdDuration,
504 body_size_bytes: usize,
505 ) {
506 self.increment_counter(
507 "harn_http_requests_total",
508 labels([
509 ("endpoint", endpoint),
510 ("method", method),
511 ("status", &status.to_string()),
512 ]),
513 1,
514 );
515 self.observe_histogram(
516 "harn_http_request_duration_seconds",
517 labels([("endpoint", endpoint)]),
518 duration.as_secs_f64(),
519 &Self::DURATION_BUCKETS,
520 );
521 self.observe_histogram(
522 "harn_http_body_size_bytes",
523 labels([("endpoint", endpoint)]),
524 body_size_bytes as f64,
525 &Self::SIZE_BUCKETS,
526 );
527 }
528
529 pub fn record_trigger_received(&self, trigger_id: &str, provider: &str) {
530 self.increment_counter(
531 "harn_trigger_received_total",
532 labels([("trigger_id", trigger_id), ("provider", provider)]),
533 1,
534 );
535 }
536
537 pub fn record_trigger_deduped(&self, trigger_id: &str, reason: &str) {
538 self.increment_counter(
539 "harn_trigger_deduped_total",
540 labels([("trigger_id", trigger_id), ("reason", reason)]),
541 1,
542 );
543 }
544
545 pub fn record_trigger_predicate_evaluation(
546 &self,
547 trigger_id: &str,
548 result: bool,
549 cost_usd: f64,
550 ) {
551 self.increment_counter(
552 "harn_trigger_predicate_evaluations_total",
553 labels([
554 ("trigger_id", trigger_id),
555 ("result", if result { "true" } else { "false" }),
556 ]),
557 1,
558 );
559 self.observe_histogram(
560 "harn_trigger_predicate_cost_usd",
561 labels([("trigger_id", trigger_id)]),
562 cost_usd.max(0.0),
563 &[0.0, 0.001, 0.01, 0.05, 0.1, 1.0],
564 );
565 }
566
567 pub fn record_trigger_dispatched(&self, trigger_id: &str, handler_kind: &str, outcome: &str) {
568 self.increment_counter(
569 "harn_trigger_dispatched_total",
570 labels([
571 ("trigger_id", trigger_id),
572 ("handler_kind", handler_kind),
573 ("outcome", outcome),
574 ]),
575 1,
576 );
577 }
578
579 pub fn record_trigger_retry(&self, trigger_id: &str, attempt: u32) {
580 self.increment_counter(
581 "harn_trigger_retries_total",
582 labels([
583 ("trigger_id", trigger_id),
584 ("attempt", &attempt.to_string()),
585 ]),
586 1,
587 );
588 }
589
590 pub fn record_trigger_dlq(&self, trigger_id: &str, reason: &str) {
591 self.increment_counter(
592 "harn_trigger_dlq_total",
593 labels([("trigger_id", trigger_id), ("reason", reason)]),
594 1,
595 );
596 }
597
598 pub fn record_trigger_accepted_to_normalized(
599 &self,
600 trigger_id: &str,
601 binding_key: &str,
602 provider: &str,
603 tenant_id: Option<&str>,
604 status: &str,
605 duration: StdDuration,
606 ) {
607 self.observe_histogram(
608 "harn_trigger_webhook_accepted_to_normalized_seconds",
609 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
610 duration.as_secs_f64(),
611 &Self::TRIGGER_LATENCY_BUCKETS,
612 );
613 }
614
615 pub fn record_trigger_accepted_to_queue_append(
616 &self,
617 trigger_id: &str,
618 binding_key: &str,
619 provider: &str,
620 tenant_id: Option<&str>,
621 status: &str,
622 duration: StdDuration,
623 ) {
624 self.observe_histogram(
625 "harn_trigger_webhook_accepted_to_queue_append_seconds",
626 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
627 duration.as_secs_f64(),
628 &Self::TRIGGER_LATENCY_BUCKETS,
629 );
630 }
631
632 pub fn record_trigger_queue_age_at_dispatch_admission(
633 &self,
634 trigger_id: &str,
635 binding_key: &str,
636 provider: &str,
637 tenant_id: Option<&str>,
638 status: &str,
639 age: StdDuration,
640 ) {
641 self.observe_histogram(
642 "harn_trigger_queue_age_at_dispatch_admission_seconds",
643 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
644 age.as_secs_f64(),
645 &Self::TRIGGER_LATENCY_BUCKETS,
646 );
647 }
648
649 pub fn record_trigger_queue_age_at_dispatch_start(
650 &self,
651 trigger_id: &str,
652 binding_key: &str,
653 provider: &str,
654 tenant_id: Option<&str>,
655 status: &str,
656 age: StdDuration,
657 ) {
658 self.observe_histogram(
659 "harn_trigger_queue_age_at_dispatch_start_seconds",
660 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
661 age.as_secs_f64(),
662 &Self::TRIGGER_LATENCY_BUCKETS,
663 );
664 }
665
666 pub fn record_trigger_dispatch_runtime(
667 &self,
668 trigger_id: &str,
669 binding_key: &str,
670 provider: &str,
671 tenant_id: Option<&str>,
672 status: &str,
673 duration: StdDuration,
674 ) {
675 self.observe_histogram(
676 "harn_trigger_dispatch_runtime_seconds",
677 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
678 duration.as_secs_f64(),
679 &Self::TRIGGER_LATENCY_BUCKETS,
680 );
681 }
682
683 pub fn record_trigger_retry_delay(
684 &self,
685 trigger_id: &str,
686 binding_key: &str,
687 provider: &str,
688 tenant_id: Option<&str>,
689 status: &str,
690 duration: StdDuration,
691 ) {
692 self.observe_histogram(
693 "harn_trigger_retry_delay_seconds",
694 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
695 duration.as_secs_f64(),
696 &Self::TRIGGER_LATENCY_BUCKETS,
697 );
698 }
699
700 pub fn record_trigger_accepted_to_dlq(
701 &self,
702 trigger_id: &str,
703 binding_key: &str,
704 provider: &str,
705 tenant_id: Option<&str>,
706 status: &str,
707 duration: StdDuration,
708 ) {
709 self.observe_histogram(
710 "harn_trigger_accepted_to_dlq_seconds",
711 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
712 duration.as_secs_f64(),
713 &Self::TRIGGER_LATENCY_BUCKETS,
714 );
715 }
716
717 pub fn note_trigger_pending_event(
718 &self,
719 event_id: &str,
720 trigger_id: &str,
721 binding_key: &str,
722 provider: &str,
723 tenant_id: Option<&str>,
724 accepted_at_ms: i64,
725 now_ms: i64,
726 ) {
727 let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
728 {
729 let mut pending = self
730 .pending_trigger_events
731 .lock()
732 .expect("pending trigger events poisoned");
733 pending
734 .entry(labels.clone())
735 .or_default()
736 .insert(event_id.to_string(), accepted_at_ms);
737 }
738 self.refresh_oldest_pending_gauge(labels, now_ms);
739 }
740
741 pub fn clear_trigger_pending_event(
742 &self,
743 event_id: &str,
744 trigger_id: &str,
745 binding_key: &str,
746 provider: &str,
747 tenant_id: Option<&str>,
748 now_ms: i64,
749 ) {
750 let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
751 {
752 let mut pending = self
753 .pending_trigger_events
754 .lock()
755 .expect("pending trigger events poisoned");
756 if let Some(events) = pending.get_mut(&labels) {
757 events.remove(event_id);
758 if events.is_empty() {
759 pending.remove(&labels);
760 }
761 }
762 }
763 self.refresh_oldest_pending_gauge(labels, now_ms);
764 }
765
766 pub fn set_trigger_inflight(&self, trigger_id: &str, count: u64) {
767 self.set_gauge(
768 "harn_trigger_inflight",
769 labels([("trigger_id", trigger_id)]),
770 count as f64,
771 );
772 }
773
774 pub fn set_trigger_budget_cost_today(&self, trigger_id: &str, cost_usd: f64) {
775 self.set_gauge(
776 "harn_trigger_budget_cost_today_usd",
777 labels([("trigger_id", trigger_id)]),
778 cost_usd.max(0.0),
779 );
780 }
781
782 pub fn record_trigger_budget_exhausted(&self, trigger_id: &str, strategy: &str) {
783 self.increment_counter(
784 "harn_trigger_budget_exhausted_total",
785 labels([("trigger_id", trigger_id), ("strategy", strategy)]),
786 1,
787 );
788 }
789
790 pub fn record_backpressure_event(&self, dimension: &str, action: &str) {
791 self.increment_counter(
792 "harn_backpressure_events_total",
793 labels([("dimension", dimension), ("action", action)]),
794 1,
795 );
796 }
797
798 pub fn record_event_log_append(
799 &self,
800 topic: &str,
801 duration: StdDuration,
802 payload_bytes: usize,
803 ) {
804 self.observe_histogram(
805 "harn_event_log_append_duration_seconds",
806 labels([("topic", topic)]),
807 duration.as_secs_f64(),
808 &Self::DURATION_BUCKETS,
809 );
810 self.set_gauge(
811 "harn_event_log_topic_size_bytes",
812 labels([("topic", topic)]),
813 payload_bytes as f64,
814 );
815 }
816
817 pub fn set_event_log_consumer_lag(&self, topic: &str, consumer: &str, lag: u64) {
818 self.set_gauge(
819 "harn_event_log_consumer_lag",
820 labels([("topic", topic), ("consumer", consumer)]),
821 lag as f64,
822 );
823 }
824
825 pub fn record_a2a_hop(&self, target: &str, outcome: &str, duration: StdDuration) {
826 self.increment_counter(
827 "harn_a2a_hops_total",
828 labels([("target", target), ("outcome", outcome)]),
829 1,
830 );
831 self.observe_histogram(
832 "harn_a2a_hop_duration_seconds",
833 labels([("target", target)]),
834 duration.as_secs_f64(),
835 &Self::DURATION_BUCKETS,
836 );
837 }
838
839 pub fn set_worker_queue_depth(&self, queue: &str, depth: u64) {
840 self.set_gauge(
841 "harn_worker_queue_depth",
842 labels([("queue", queue)]),
843 depth as f64,
844 );
845 }
846
847 pub fn record_worker_queue_claim_age(&self, queue: &str, age_seconds: f64) {
848 self.observe_histogram(
849 "harn_worker_queue_claim_age_seconds",
850 labels([("queue", queue)]),
851 age_seconds.max(0.0),
852 &Self::DURATION_BUCKETS,
853 );
854 }
855
856 pub fn record_scheduler_selection(
858 &self,
859 queue: &str,
860 fairness_dimension: &str,
861 fairness_key: &str,
862 ) {
863 self.increment_counter(
864 "harn_scheduler_selections_total",
865 labels([
866 ("queue", queue),
867 ("fairness_dimension", fairness_dimension),
868 ("fairness_key", fairness_key),
869 ]),
870 1,
871 );
872 }
873
874 pub fn record_scheduler_deferral(
877 &self,
878 queue: &str,
879 fairness_dimension: &str,
880 fairness_key: &str,
881 ) {
882 self.increment_counter(
883 "harn_scheduler_deferrals_total",
884 labels([
885 ("queue", queue),
886 ("fairness_dimension", fairness_dimension),
887 ("fairness_key", fairness_key),
888 ]),
889 1,
890 );
891 }
892
893 pub fn record_scheduler_starvation_promotion(
895 &self,
896 queue: &str,
897 fairness_dimension: &str,
898 fairness_key: &str,
899 ) {
900 self.increment_counter(
901 "harn_scheduler_starvation_promotions_total",
902 labels([
903 ("queue", queue),
904 ("fairness_dimension", fairness_dimension),
905 ("fairness_key", fairness_key),
906 ]),
907 1,
908 );
909 }
910
911 pub fn set_scheduler_deficit(
913 &self,
914 queue: &str,
915 fairness_dimension: &str,
916 fairness_key: &str,
917 deficit: i64,
918 ) {
919 self.set_gauge(
920 "harn_scheduler_deficit",
921 labels([
922 ("queue", queue),
923 ("fairness_dimension", fairness_dimension),
924 ("fairness_key", fairness_key),
925 ]),
926 deficit as f64,
927 );
928 }
929
930 pub fn set_scheduler_oldest_eligible_age(
932 &self,
933 queue: &str,
934 fairness_dimension: &str,
935 fairness_key: &str,
936 age_ms: u64,
937 ) {
938 self.set_gauge(
939 "harn_scheduler_oldest_eligible_age_seconds",
940 labels([
941 ("queue", queue),
942 ("fairness_dimension", fairness_dimension),
943 ("fairness_key", fairness_key),
944 ]),
945 age_ms as f64 / 1000.0,
946 );
947 }
948
949 pub fn set_orchestrator_pump_backlog(&self, topic: &str, count: u64) {
950 self.set_gauge(
951 "harn_orchestrator_pump_backlog",
952 labels([("topic", topic)]),
953 count as f64,
954 );
955 }
956
957 pub fn set_orchestrator_pump_outstanding(&self, topic: &str, count: usize) {
958 self.set_gauge(
959 "harn_orchestrator_pump_outstanding",
960 labels([("topic", topic)]),
961 count as f64,
962 );
963 }
964
965 pub fn record_orchestrator_pump_admission_delay(&self, topic: &str, duration: StdDuration) {
966 self.observe_histogram(
967 "harn_orchestrator_pump_admission_delay_seconds",
968 labels([("topic", topic)]),
969 duration.as_secs_f64(),
970 &Self::DURATION_BUCKETS,
971 );
972 }
973
974 pub fn record_llm_call(&self, provider: &str, model: &str, outcome: &str, cost_usd: f64) {
975 self.increment_counter(
976 "harn_llm_calls_total",
977 labels([
978 ("provider", provider),
979 ("model", model),
980 ("outcome", outcome),
981 ]),
982 1,
983 );
984 if cost_usd > 0.0 {
985 self.increment_counter(
986 "harn_llm_cost_usd_total",
987 labels([("provider", provider), ("model", model)]),
988 cost_usd,
989 );
990 } else {
991 self.ensure_counter(
992 "harn_llm_cost_usd_total",
993 labels([("provider", provider), ("model", model)]),
994 );
995 }
996 }
997
998 pub fn record_llm_cache_hit(&self, provider: &str) {
999 self.increment_counter(
1000 "harn_llm_cache_hits_total",
1001 labels([("provider", provider)]),
1002 1,
1003 );
1004 }
1005
1006 pub fn record_schema_stream_aborted(&self, provider: &str, model: &str) {
1012 self.increment_counter(
1013 "harn_llm_schema_stream_aborted_total",
1014 labels([("provider", provider), ("model", model)]),
1015 1,
1016 );
1017 }
1018
1019 pub fn render_prometheus(&self) -> String {
1020 let snapshot = self.snapshot();
1021 let counters = [
1022 (
1023 "connector_linear_timestamp_rejections_total",
1024 snapshot.linear_timestamp_rejections_total,
1025 ),
1026 (
1027 "dispatch_succeeded_total",
1028 snapshot.dispatch_succeeded_total,
1029 ),
1030 ("dispatch_failed_total", snapshot.dispatch_failed_total),
1031 ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
1032 ("retry_scheduled_total", snapshot.retry_scheduled_total),
1033 (
1034 "slack_events_delivery_success_total",
1035 snapshot.slack_delivery_success_total,
1036 ),
1037 (
1038 "slack_events_delivery_failure_total",
1039 snapshot.slack_delivery_failure_total,
1040 ),
1041 ];
1042
1043 let mut rendered = String::new();
1044 for (name, value) in counters {
1045 rendered.push_str("# TYPE ");
1046 rendered.push_str(name);
1047 rendered.push_str(" counter\n");
1048 rendered.push_str(name);
1049 rendered.push(' ');
1050 rendered.push_str(&value.to_string());
1051 rendered.push('\n');
1052 }
1053 let custom_counters = self
1054 .custom_counters
1055 .lock()
1056 .expect("custom counters poisoned");
1057 for (name, value) in custom_counters.iter() {
1058 let metric_name = format!(
1059 "connector_custom_{}_total",
1060 name.chars()
1061 .map(|ch| if ch.is_ascii_alphanumeric() || ch == '_' {
1062 ch
1063 } else {
1064 '_'
1065 })
1066 .collect::<String>()
1067 );
1068 rendered.push_str("# TYPE ");
1069 rendered.push_str(&metric_name);
1070 rendered.push_str(" counter\n");
1071 rendered.push_str(&metric_name);
1072 rendered.push(' ');
1073 rendered.push_str(&value.to_string());
1074 rendered.push('\n');
1075 }
1076 rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
1077 rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
1078 rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
1079 rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
1080 self.render_generic_metrics(&mut rendered);
1081 rendered
1082 }
1083
1084 fn increment_counter(&self, name: &str, labels: MetricLabels, amount: impl Into<f64>) {
1085 let amount = amount.into();
1086 if amount <= 0.0 || !amount.is_finite() {
1087 return;
1088 }
1089 let mut counters = self.counters.lock().expect("metrics counters poisoned");
1090 *counters.entry((name.to_string(), labels)).or_default() += amount;
1091 }
1092
1093 fn ensure_counter(&self, name: &str, labels: MetricLabels) {
1094 let mut counters = self.counters.lock().expect("metrics counters poisoned");
1095 counters.entry((name.to_string(), labels)).or_default();
1096 }
1097
1098 fn set_gauge(&self, name: &str, labels: MetricLabels, value: f64) {
1099 let mut gauges = self.gauges.lock().expect("metrics gauges poisoned");
1100 gauges.insert((name.to_string(), labels), value);
1101 }
1102
1103 fn observe_histogram(
1104 &self,
1105 name: &str,
1106 labels: MetricLabels,
1107 value: f64,
1108 bucket_bounds: &[f64],
1109 ) {
1110 if !value.is_finite() {
1111 return;
1112 }
1113 let mut histograms = self.histograms.lock().expect("metrics histograms poisoned");
1114 let histogram = histograms
1115 .entry((name.to_string(), labels))
1116 .or_insert_with(|| HistogramMetric {
1117 buckets: bucket_bounds
1118 .iter()
1119 .map(|bound| (prometheus_float(*bound), 0))
1120 .chain(std::iter::once(("+Inf".to_string(), 0)))
1121 .collect(),
1122 count: 0,
1123 sum: 0.0,
1124 });
1125 histogram.count += 1;
1126 histogram.sum += value;
1127 for bound in bucket_bounds {
1128 if value <= *bound {
1129 let key = prometheus_float(*bound);
1130 *histogram.buckets.entry(key).or_default() += 1;
1131 }
1132 }
1133 *histogram.buckets.entry("+Inf".to_string()).or_default() += 1;
1134 }
1135
1136 fn refresh_oldest_pending_gauge(&self, labels: MetricLabels, now_ms: i64) {
1137 let oldest_accepted_at_ms = self
1138 .pending_trigger_events
1139 .lock()
1140 .expect("pending trigger events poisoned")
1141 .get(&labels)
1142 .and_then(|events| events.values().min().copied());
1143 let age_seconds = oldest_accepted_at_ms
1144 .map(|accepted_at_ms| millis_delta(now_ms, accepted_at_ms).as_secs_f64())
1145 .unwrap_or(0.0);
1146 self.set_gauge(
1147 "harn_trigger_oldest_pending_age_seconds",
1148 labels,
1149 age_seconds,
1150 );
1151 }
1152
1153 fn render_generic_metrics(&self, rendered: &mut String) {
1154 let counters = self
1155 .counters
1156 .lock()
1157 .expect("metrics counters poisoned")
1158 .clone();
1159 let gauges = self.gauges.lock().expect("metrics gauges poisoned").clone();
1160 let histograms = self
1161 .histograms
1162 .lock()
1163 .expect("metrics histograms poisoned")
1164 .clone();
1165
1166 for name in metric_family_names(MetricKind::Counter) {
1167 rendered.push_str("# TYPE ");
1168 rendered.push_str(name);
1169 rendered.push_str(" counter\n");
1170 for ((sample_name, labels), value) in counters.iter().filter(|((n, _), _)| n == name) {
1171 render_sample(rendered, sample_name, labels, *value);
1172 }
1173 }
1174 for name in metric_family_names(MetricKind::Gauge) {
1175 rendered.push_str("# TYPE ");
1176 rendered.push_str(name);
1177 rendered.push_str(" gauge\n");
1178 for ((sample_name, labels), value) in gauges.iter().filter(|((n, _), _)| n == name) {
1179 render_sample(rendered, sample_name, labels, *value);
1180 }
1181 }
1182 for name in metric_family_names(MetricKind::Histogram) {
1183 rendered.push_str("# TYPE ");
1184 rendered.push_str(name);
1185 rendered.push_str(" histogram\n");
1186 for ((sample_name, labels), histogram) in
1187 histograms.iter().filter(|((n, _), _)| n == name)
1188 {
1189 for (le, value) in &histogram.buckets {
1190 let mut bucket_labels = labels.clone();
1191 bucket_labels.insert("le".to_string(), le.clone());
1192 render_sample(
1193 rendered,
1194 &format!("{sample_name}_bucket"),
1195 &bucket_labels,
1196 *value as f64,
1197 );
1198 }
1199 render_sample(
1200 rendered,
1201 &format!("{sample_name}_sum"),
1202 labels,
1203 histogram.sum,
1204 );
1205 render_sample(
1206 rendered,
1207 &format!("{sample_name}_count"),
1208 labels,
1209 histogram.count as f64,
1210 );
1211 }
1212 }
1213 }
1214}
1215
1216#[derive(Clone, Copy)]
1217enum MetricKind {
1218 Counter,
1219 Gauge,
1220 Histogram,
1221}
1222
1223fn metric_family_names(kind: MetricKind) -> &'static [&'static str] {
1224 match kind {
1225 MetricKind::Counter => &[
1226 "harn_http_requests_total",
1227 "harn_trigger_received_total",
1228 "harn_trigger_deduped_total",
1229 "harn_trigger_predicate_evaluations_total",
1230 "harn_trigger_dispatched_total",
1231 "harn_trigger_retries_total",
1232 "harn_trigger_dlq_total",
1233 "harn_trigger_budget_exhausted_total",
1234 "harn_backpressure_events_total",
1235 "harn_a2a_hops_total",
1236 "harn_llm_calls_total",
1237 "harn_llm_cost_usd_total",
1238 "harn_llm_cache_hits_total",
1239 "harn_llm_schema_stream_aborted_total",
1240 "harn_scheduler_selections_total",
1241 "harn_scheduler_deferrals_total",
1242 "harn_scheduler_starvation_promotions_total",
1243 ],
1244 MetricKind::Gauge => &[
1245 "harn_trigger_inflight",
1246 "harn_event_log_topic_size_bytes",
1247 "harn_event_log_consumer_lag",
1248 "harn_trigger_budget_cost_today_usd",
1249 "harn_worker_queue_depth",
1250 "harn_orchestrator_pump_backlog",
1251 "harn_orchestrator_pump_outstanding",
1252 "harn_trigger_oldest_pending_age_seconds",
1253 "harn_scheduler_deficit",
1254 "harn_scheduler_oldest_eligible_age_seconds",
1255 ],
1256 MetricKind::Histogram => &[
1257 "harn_http_request_duration_seconds",
1258 "harn_http_body_size_bytes",
1259 "harn_trigger_predicate_cost_usd",
1260 "harn_event_log_append_duration_seconds",
1261 "harn_a2a_hop_duration_seconds",
1262 "harn_worker_queue_claim_age_seconds",
1263 "harn_orchestrator_pump_admission_delay_seconds",
1264 "harn_trigger_webhook_accepted_to_normalized_seconds",
1265 "harn_trigger_webhook_accepted_to_queue_append_seconds",
1266 "harn_trigger_queue_age_at_dispatch_admission_seconds",
1267 "harn_trigger_queue_age_at_dispatch_start_seconds",
1268 "harn_trigger_dispatch_runtime_seconds",
1269 "harn_trigger_retry_delay_seconds",
1270 "harn_trigger_accepted_to_dlq_seconds",
1271 ],
1272 }
1273}
1274
1275fn labels<const N: usize>(pairs: [(&str, &str); N]) -> MetricLabels {
1276 pairs
1277 .into_iter()
1278 .map(|(name, value)| (name.to_string(), value.to_string()))
1279 .collect()
1280}
1281
1282fn trigger_lifecycle_labels(
1283 trigger_id: &str,
1284 binding_key: &str,
1285 provider: &str,
1286 tenant_id: Option<&str>,
1287 status: &str,
1288) -> MetricLabels {
1289 labels([
1290 ("binding_key", binding_key),
1291 ("provider", provider),
1292 ("status", status),
1293 ("tenant_id", tenant_label(tenant_id)),
1294 ("trigger_id", trigger_id),
1295 ])
1296}
1297
1298fn trigger_pending_labels(
1299 trigger_id: &str,
1300 binding_key: &str,
1301 provider: &str,
1302 tenant_id: Option<&str>,
1303) -> MetricLabels {
1304 labels([
1305 ("binding_key", binding_key),
1306 ("provider", provider),
1307 ("tenant_id", tenant_label(tenant_id)),
1308 ("trigger_id", trigger_id),
1309 ])
1310}
1311
1312fn tenant_label(tenant_id: Option<&str>) -> &str {
1313 tenant_id
1314 .map(str::trim)
1315 .filter(|value| !value.is_empty())
1316 .unwrap_or("none")
1317}
1318
1319fn millis_delta(later_ms: i64, earlier_ms: i64) -> StdDuration {
1320 StdDuration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
1321}
1322
1323fn render_sample(rendered: &mut String, name: &str, labels: &MetricLabels, value: f64) {
1324 rendered.push_str(name);
1325 if !labels.is_empty() {
1326 rendered.push('{');
1327 for (index, (label, label_value)) in labels.iter().enumerate() {
1328 if index > 0 {
1329 rendered.push(',');
1330 }
1331 rendered.push_str(label);
1332 rendered.push_str("=\"");
1333 rendered.push_str(&escape_label_value(label_value));
1334 rendered.push('"');
1335 }
1336 rendered.push('}');
1337 }
1338 rendered.push(' ');
1339 rendered.push_str(&prometheus_float(value));
1340 rendered.push('\n');
1341}
1342
1343fn escape_label_value(value: &str) -> String {
1344 value
1345 .chars()
1346 .flat_map(|ch| match ch {
1347 '\\' => "\\\\".chars().collect::<Vec<_>>(),
1348 '"' => "\\\"".chars().collect::<Vec<_>>(),
1349 '\n' => "\\n".chars().collect::<Vec<_>>(),
1350 other => vec![other],
1351 })
1352 .collect()
1353}
1354
1355fn prometheus_float(value: f64) -> String {
1356 if value.is_infinite() && value.is_sign_positive() {
1357 return "+Inf".to_string();
1358 }
1359 if value.fract() == 0.0 {
1360 format!("{value:.0}")
1361 } else {
1362 let rendered = format!("{value:.6}");
1363 rendered
1364 .trim_end_matches('0')
1365 .trim_end_matches('.')
1366 .to_string()
1367 }
1368}
1369
1370#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1372pub struct ProviderPayloadSchema {
1373 pub harn_schema_name: String,
1374 #[serde(default)]
1375 pub json_schema: JsonValue,
1376}
1377
1378impl ProviderPayloadSchema {
1379 pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
1380 Self {
1381 harn_schema_name: harn_schema_name.into(),
1382 json_schema,
1383 }
1384 }
1385
1386 pub fn named(harn_schema_name: impl Into<String>) -> Self {
1387 Self::new(harn_schema_name, JsonValue::Null)
1388 }
1389}
1390
1391impl Default for ProviderPayloadSchema {
1392 fn default() -> Self {
1393 Self::named("raw")
1394 }
1395}
1396
1397#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1399#[serde(transparent)]
1400pub struct TriggerKind(String);
1401
1402impl TriggerKind {
1403 pub fn new(value: impl Into<String>) -> Self {
1404 Self(value.into())
1405 }
1406
1407 pub fn as_str(&self) -> &str {
1408 self.0.as_str()
1409 }
1410}
1411
1412impl From<&str> for TriggerKind {
1413 fn from(value: &str) -> Self {
1414 Self::new(value)
1415 }
1416}
1417
1418impl From<String> for TriggerKind {
1419 fn from(value: String) -> Self {
1420 Self::new(value)
1421 }
1422}
1423
1424#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1426pub struct TriggerBinding {
1427 pub provider: ProviderId,
1428 pub kind: TriggerKind,
1429 pub binding_id: String,
1430 #[serde(default)]
1431 pub dedupe_key: Option<String>,
1432 #[serde(default = "default_dedupe_retention_days")]
1433 pub dedupe_retention_days: u32,
1434 #[serde(default)]
1435 pub config: JsonValue,
1436}
1437
1438impl TriggerBinding {
1439 pub fn new(
1440 provider: ProviderId,
1441 kind: impl Into<TriggerKind>,
1442 binding_id: impl Into<String>,
1443 ) -> Self {
1444 Self {
1445 provider,
1446 kind: kind.into(),
1447 binding_id: binding_id.into(),
1448 dedupe_key: None,
1449 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1450 config: JsonValue::Null,
1451 }
1452 }
1453}
1454
1455fn default_dedupe_retention_days() -> u32 {
1456 crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
1457}
1458
1459#[derive(Clone, Debug, Default)]
1461pub struct TriggerRegistry {
1462 bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
1463}
1464
1465impl TriggerRegistry {
1466 pub fn register(&mut self, binding: TriggerBinding) {
1467 self.bindings
1468 .entry(binding.provider.clone())
1469 .or_default()
1470 .push(binding);
1471 }
1472
1473 pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
1474 &self.bindings
1475 }
1476
1477 pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
1478 self.bindings
1479 .get(provider)
1480 .map(Vec::as_slice)
1481 .unwrap_or(&[])
1482 }
1483}
1484
1485#[derive(Clone, Debug, PartialEq, Eq)]
1487pub struct ActivationHandle {
1488 pub provider: ProviderId,
1489 pub binding_count: usize,
1490}
1491
1492impl ActivationHandle {
1493 pub fn new(provider: ProviderId, binding_count: usize) -> Self {
1494 Self {
1495 provider,
1496 binding_count,
1497 }
1498 }
1499}
1500
1501#[derive(Clone, Debug, PartialEq, Eq)]
1503pub struct RawInbound {
1504 pub kind: String,
1505 pub headers: BTreeMap<String, String>,
1506 pub query: BTreeMap<String, String>,
1507 pub body: Vec<u8>,
1508 pub received_at: OffsetDateTime,
1509 pub occurred_at: Option<OffsetDateTime>,
1510 pub tenant_id: Option<TenantId>,
1511 pub metadata: JsonValue,
1512}
1513
1514impl RawInbound {
1515 pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
1516 Self {
1517 kind: kind.into(),
1518 headers,
1519 query: BTreeMap::new(),
1520 body,
1521 received_at: clock::now_utc(),
1522 occurred_at: None,
1523 tenant_id: None,
1524 metadata: JsonValue::Null,
1525 }
1526 }
1527
1528 pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
1529 Ok(serde_json::from_slice(&self.body)?)
1530 }
1531}
1532
1533#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1535pub struct RateLimitConfig {
1536 pub capacity: u32,
1537 pub refill_tokens: u32,
1538 pub refill_interval: StdDuration,
1539}
1540
1541impl Default for RateLimitConfig {
1542 fn default() -> Self {
1543 Self {
1544 capacity: 60,
1545 refill_tokens: 1,
1546 refill_interval: StdDuration::from_secs(1),
1547 }
1548 }
1549}
1550
1551#[derive(Clone, Debug)]
1552struct TokenBucket {
1553 tokens: f64,
1554 last_refill: ClockInstant,
1555}
1556
1557impl TokenBucket {
1558 fn full(config: RateLimitConfig) -> Self {
1559 Self {
1560 tokens: config.capacity as f64,
1561 last_refill: clock::instant_now(),
1562 }
1563 }
1564
1565 fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
1566 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1567 let rate = config.refill_tokens.max(1) as f64 / interval;
1568 let elapsed = now.duration_since(self.last_refill).as_secs_f64();
1569 self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
1570 self.last_refill = now;
1571 }
1572
1573 fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
1574 self.refill(config, now);
1575 if self.tokens >= 1.0 {
1576 self.tokens -= 1.0;
1577 true
1578 } else {
1579 false
1580 }
1581 }
1582
1583 fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
1584 if self.tokens >= 1.0 {
1585 return StdDuration::ZERO;
1586 }
1587 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1588 let rate = config.refill_tokens.max(1) as f64 / interval;
1589 let missing = (1.0 - self.tokens).max(0.0);
1590 StdDuration::from_secs_f64((missing / rate).max(0.001))
1591 }
1592}
1593
1594#[derive(Debug)]
1596pub struct RateLimiterFactory {
1597 config: RateLimitConfig,
1598 buckets: Mutex<HashMap<(String, String), TokenBucket>>,
1599}
1600
1601impl RateLimiterFactory {
1602 pub fn new(config: RateLimitConfig) -> Self {
1603 Self {
1604 config,
1605 buckets: Mutex::new(HashMap::new()),
1606 }
1607 }
1608
1609 pub fn config(&self) -> RateLimitConfig {
1610 self.config
1611 }
1612
1613 pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
1614 ScopedRateLimiter {
1615 factory: self,
1616 provider: provider.clone(),
1617 key: key.into(),
1618 }
1619 }
1620
1621 pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
1622 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1623 let bucket = buckets
1624 .entry((provider.as_str().to_string(), key.to_string()))
1625 .or_insert_with(|| TokenBucket::full(self.config));
1626 bucket.try_acquire(self.config, clock::instant_now())
1627 }
1628
1629 pub async fn acquire(&self, provider: &ProviderId, key: &str) {
1630 loop {
1631 let wait = {
1632 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1633 let bucket = buckets
1634 .entry((provider.as_str().to_string(), key.to_string()))
1635 .or_insert_with(|| TokenBucket::full(self.config));
1636 if bucket.try_acquire(self.config, clock::instant_now()) {
1637 return;
1638 }
1639 bucket.wait_duration(self.config)
1640 };
1641 clock::sleep(wait).await;
1646 }
1647 }
1648}
1649
1650impl Default for RateLimiterFactory {
1651 fn default() -> Self {
1652 Self::new(RateLimitConfig::default())
1653 }
1654}
1655
1656#[derive(Clone, Debug)]
1658pub struct ScopedRateLimiter<'a> {
1659 factory: &'a RateLimiterFactory,
1660 provider: ProviderId,
1661 key: String,
1662}
1663
1664impl<'a> ScopedRateLimiter<'a> {
1665 pub fn try_acquire(&self) -> bool {
1666 self.factory.try_acquire(&self.provider, &self.key)
1667 }
1668
1669 pub async fn acquire(&self) {
1670 self.factory.acquire(&self.provider, &self.key).await;
1671 }
1672}
1673
1674pub struct ConnectorRegistry {
1676 connectors: BTreeMap<ProviderId, ConnectorHandle>,
1677}
1678
1679impl ConnectorRegistry {
1680 pub fn empty() -> Self {
1681 Self {
1682 connectors: BTreeMap::new(),
1683 }
1684 }
1685
1686 pub fn with_defaults() -> Self {
1687 let mut registry = Self::empty();
1688 for provider in registered_provider_metadata() {
1689 if !matches!(provider.runtime, ProviderRuntimeMetadata::Builtin { .. }) {
1690 continue;
1691 }
1692 registry
1693 .register(default_connector_for_provider(&provider))
1694 .expect("default connector registration should not fail");
1695 }
1696 registry
1697 }
1698
1699 pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
1700 let provider = connector.provider_id().clone();
1701 if self.connectors.contains_key(&provider) {
1702 return Err(ConnectorError::DuplicateProvider(provider.0));
1703 }
1704 self.connectors
1705 .insert(provider, Arc::new(AsyncMutex::new(connector)));
1706 Ok(())
1707 }
1708
1709 pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
1710 self.connectors.get(id).cloned()
1711 }
1712
1713 pub fn remove(&mut self, id: &ProviderId) -> Option<ConnectorHandle> {
1714 self.connectors.remove(id)
1715 }
1716
1717 pub fn list(&self) -> Vec<ProviderId> {
1718 self.connectors.keys().cloned().collect()
1719 }
1720
1721 pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1722 for connector in self.connectors.values() {
1723 connector.lock().await.init(ctx.clone()).await?;
1724 }
1725 Ok(())
1726 }
1727
1728 pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
1729 let mut clients = BTreeMap::new();
1730 for (provider, connector) in &self.connectors {
1731 let client = connector.lock().await.client();
1732 clients.insert(provider.clone(), client);
1733 }
1734 clients
1735 }
1736
1737 pub async fn activate_all(
1738 &self,
1739 registry: &TriggerRegistry,
1740 ) -> Result<Vec<ActivationHandle>, ConnectorError> {
1741 let mut handles = Vec::new();
1742 for (provider, connector) in &self.connectors {
1743 let bindings = registry.bindings_for(provider);
1744 if bindings.is_empty() {
1745 continue;
1746 }
1747 let connector = connector.lock().await;
1748 handles.push(connector.activate(bindings).await?);
1749 }
1750 Ok(handles)
1751 }
1752}
1753
1754impl Default for ConnectorRegistry {
1755 fn default() -> Self {
1756 Self::with_defaults()
1757 }
1758}
1759
1760fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
1761 match &provider.runtime {
1762 ProviderRuntimeMetadata::Builtin {
1763 connector,
1764 default_signature_variant,
1765 } => match connector.as_str() {
1766 "a2a-push" => Box::new(A2aPushConnector::new()),
1767 "cron" => Box::new(CronConnector::new()),
1768 "stream" => Box::new(StreamConnector::new(
1769 ProviderId::from(provider.provider.clone()),
1770 provider.schema_name.clone(),
1771 )),
1772 "webhook" => {
1773 let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
1774 .expect("catalog webhook signature variant must be valid");
1775 Box::new(GenericWebhookConnector::with_profile(
1776 WebhookProviderProfile::new(
1777 ProviderId::from(provider.provider.clone()),
1778 provider.schema_name.clone(),
1779 variant,
1780 ),
1781 ))
1782 }
1783 _ => Box::new(PlaceholderConnector::from_metadata(provider)),
1784 },
1785 ProviderRuntimeMetadata::Placeholder => {
1786 Box::new(PlaceholderConnector::from_metadata(provider))
1787 }
1788 }
1789}
1790
1791struct PlaceholderConnector {
1792 provider_id: ProviderId,
1793 kinds: Vec<TriggerKind>,
1794 schema_name: String,
1795}
1796
1797impl PlaceholderConnector {
1798 fn from_metadata(metadata: &ProviderMetadata) -> Self {
1799 Self {
1800 provider_id: ProviderId::from(metadata.provider.clone()),
1801 kinds: metadata
1802 .kinds
1803 .iter()
1804 .cloned()
1805 .map(TriggerKind::from)
1806 .collect(),
1807 schema_name: metadata.schema_name.clone(),
1808 }
1809 }
1810}
1811
1812struct PlaceholderClient;
1813
1814#[async_trait]
1815impl ConnectorClient for PlaceholderClient {
1816 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1817 Err(ClientError::Other(format!(
1818 "connector client method '{method}' is not implemented for this provider"
1819 )))
1820 }
1821}
1822
1823#[async_trait]
1824impl Connector for PlaceholderConnector {
1825 fn provider_id(&self) -> &ProviderId {
1826 &self.provider_id
1827 }
1828
1829 fn kinds(&self) -> &[TriggerKind] {
1830 &self.kinds
1831 }
1832
1833 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1834 Ok(())
1835 }
1836
1837 async fn activate(
1838 &self,
1839 bindings: &[TriggerBinding],
1840 ) -> Result<ActivationHandle, ConnectorError> {
1841 Ok(ActivationHandle::new(
1842 self.provider_id.clone(),
1843 bindings.len(),
1844 ))
1845 }
1846
1847 async fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
1848 Err(ConnectorError::Unsupported(format!(
1849 "provider '{}' is cataloged but does not have a concrete inbound connector yet",
1850 self.provider_id.as_str()
1851 )))
1852 }
1853
1854 fn payload_schema(&self) -> ProviderPayloadSchema {
1855 ProviderPayloadSchema::named(self.schema_name.clone())
1856 }
1857
1858 fn client(&self) -> Arc<dyn ConnectorClient> {
1859 Arc::new(PlaceholderClient)
1860 }
1861}
1862
1863pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
1864 ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
1865 *slot.borrow_mut() = clients
1866 .into_iter()
1867 .map(|(provider, client)| (provider.as_str().to_string(), client))
1868 .collect();
1869 });
1870}
1871
1872pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
1873 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
1874}
1875
1876pub fn clear_active_connector_clients() {
1877 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
1878}
1879
1880#[cfg(test)]
1881mod tests {
1882 use super::*;
1883
1884 use std::sync::atomic::{AtomicUsize, Ordering};
1885
1886 use async_trait::async_trait;
1887 use serde_json::json;
1888
1889 struct NoopClient;
1890
1891 #[async_trait]
1892 impl ConnectorClient for NoopClient {
1893 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1894 Ok(json!({ "method": method }))
1895 }
1896 }
1897
1898 struct FakeConnector {
1899 provider_id: ProviderId,
1900 kinds: Vec<TriggerKind>,
1901 activate_calls: Arc<AtomicUsize>,
1902 }
1903
1904 impl FakeConnector {
1905 fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
1906 Self {
1907 provider_id: ProviderId::from(provider_id),
1908 kinds: vec![TriggerKind::from("webhook")],
1909 activate_calls,
1910 }
1911 }
1912 }
1913
1914 #[async_trait]
1915 impl Connector for FakeConnector {
1916 fn provider_id(&self) -> &ProviderId {
1917 &self.provider_id
1918 }
1919
1920 fn kinds(&self) -> &[TriggerKind] {
1921 &self.kinds
1922 }
1923
1924 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1925 Ok(())
1926 }
1927
1928 async fn activate(
1929 &self,
1930 bindings: &[TriggerBinding],
1931 ) -> Result<ActivationHandle, ConnectorError> {
1932 self.activate_calls.fetch_add(1, Ordering::SeqCst);
1933 Ok(ActivationHandle::new(
1934 self.provider_id.clone(),
1935 bindings.len(),
1936 ))
1937 }
1938
1939 async fn normalize_inbound(
1940 &self,
1941 _raw: RawInbound,
1942 ) -> Result<TriggerEvent, ConnectorError> {
1943 Err(ConnectorError::Unsupported(
1944 "not needed for registry tests".to_string(),
1945 ))
1946 }
1947
1948 fn payload_schema(&self) -> ProviderPayloadSchema {
1949 ProviderPayloadSchema::named("FakePayload")
1950 }
1951
1952 fn client(&self) -> Arc<dyn ConnectorClient> {
1953 Arc::new(NoopClient)
1954 }
1955 }
1956
1957 #[tokio::test]
1958 async fn connector_registry_rejects_duplicate_providers() {
1959 let activate_calls = Arc::new(AtomicUsize::new(0));
1960 let mut registry = ConnectorRegistry::empty();
1961 registry
1962 .register(Box::new(FakeConnector::new(
1963 "github",
1964 activate_calls.clone(),
1965 )))
1966 .unwrap();
1967
1968 let error = registry
1969 .register(Box::new(FakeConnector::new("github", activate_calls)))
1970 .unwrap_err();
1971 assert!(matches!(
1972 error,
1973 ConnectorError::DuplicateProvider(provider) if provider == "github"
1974 ));
1975 }
1976
1977 #[tokio::test]
1978 async fn connector_registry_activates_only_bound_connectors() {
1979 let github_calls = Arc::new(AtomicUsize::new(0));
1980 let slack_calls = Arc::new(AtomicUsize::new(0));
1981 let mut registry = ConnectorRegistry::empty();
1982 registry
1983 .register(Box::new(FakeConnector::new("github", github_calls.clone())))
1984 .unwrap();
1985 registry
1986 .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
1987 .unwrap();
1988
1989 let mut trigger_registry = TriggerRegistry::default();
1990 trigger_registry.register(TriggerBinding::new(
1991 ProviderId::from("github"),
1992 "webhook",
1993 "github.push",
1994 ));
1995 trigger_registry.register(TriggerBinding::new(
1996 ProviderId::from("github"),
1997 "webhook",
1998 "github.installation",
1999 ));
2000
2001 let handles = registry.activate_all(&trigger_registry).await.unwrap();
2002 assert_eq!(handles.len(), 1);
2003 assert_eq!(handles[0].provider.as_str(), "github");
2004 assert_eq!(handles[0].binding_count, 2);
2005 assert_eq!(github_calls.load(Ordering::SeqCst), 1);
2006 assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
2007 }
2008
2009 #[test]
2010 fn rate_limiter_scopes_tokens_by_provider_and_key() {
2011 let factory = RateLimiterFactory::new(RateLimitConfig {
2012 capacity: 1,
2013 refill_tokens: 1,
2014 refill_interval: StdDuration::from_mins(1),
2015 });
2016
2017 assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
2018 assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
2019 assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
2020 assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
2021 }
2022
2023 #[test]
2024 fn raw_inbound_json_body_preserves_raw_bytes() {
2025 let raw = RawInbound::new(
2026 "push",
2027 BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
2028 br#"{"ok":true}"#.to_vec(),
2029 );
2030
2031 assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
2032 }
2033
2034 #[test]
2035 fn connector_registry_lists_core_catalog_providers() {
2036 let registry = ConnectorRegistry::default();
2037 let providers = registry.list();
2038 assert!(providers.contains(&ProviderId::from("cron")));
2039 assert!(providers.contains(&ProviderId::from("webhook")));
2040 assert!(providers.contains(&ProviderId::from("kafka")));
2041 assert!(!providers.contains(&ProviderId::from("github")));
2042 assert!(!providers.contains(&ProviderId::from("slack")));
2043 }
2044
2045 #[test]
2046 fn pure_harn_pivot_only_keeps_core_builtin_connectors() {
2047 let core_runtime_providers = [
2048 "a2a-push",
2049 "cron",
2050 "email",
2051 "kafka",
2052 "nats",
2053 "postgres-cdc",
2054 "pulsar",
2055 "webhook",
2056 "websocket",
2057 ];
2058
2059 for provider in registered_provider_metadata() {
2060 if !matches!(provider.runtime, ProviderRuntimeMetadata::Builtin { .. }) {
2061 continue;
2062 }
2063
2064 let allowed_core = core_runtime_providers.contains(&provider.provider.as_str());
2065 assert!(
2066 allowed_core,
2067 "provider '{}' is registered as a Rust builtin connector; new service connectors \
2068 must ship as pure-Harn packages and register with connector = {{ harn = \"...\" }}",
2069 provider.provider
2070 );
2071 }
2072 }
2073
2074 #[test]
2075 fn metrics_registry_exports_orchestrator_metric_families() {
2076 let metrics = MetricsRegistry::default();
2077 metrics.record_http_request(
2078 "/triggers/github",
2079 "POST",
2080 200,
2081 StdDuration::from_millis(25),
2082 512,
2083 );
2084 metrics.record_trigger_received("github-new-issue", "github");
2085 metrics.record_trigger_deduped("github-new-issue", "inbox_duplicate");
2086 metrics.record_trigger_predicate_evaluation("github-new-issue", true, 0.002);
2087 metrics.record_trigger_dispatched("github-new-issue", "local", "succeeded");
2088 metrics.record_trigger_retry("github-new-issue", 2);
2089 metrics.record_trigger_dlq("github-new-issue", "retry_exhausted");
2090 metrics.set_trigger_inflight("github-new-issue", 0);
2091 metrics.record_event_log_append(
2092 "orchestrator.triggers.pending",
2093 StdDuration::from_millis(1),
2094 2048,
2095 );
2096 metrics.set_event_log_consumer_lag("orchestrator.triggers.pending", "orchestrator-pump", 0);
2097 metrics.set_trigger_budget_cost_today("github-new-issue", 0.002);
2098 metrics.record_trigger_budget_exhausted("github-new-issue", "daily_budget_exceeded");
2099 metrics.record_a2a_hop("agent.example", "succeeded", StdDuration::from_millis(10));
2100 metrics.set_worker_queue_depth("triage", 1);
2101 metrics.record_worker_queue_claim_age("triage", 3.0);
2102 metrics.set_orchestrator_pump_backlog("trigger.inbox.envelopes", 2);
2103 metrics.set_orchestrator_pump_outstanding("trigger.inbox.envelopes", 1);
2104 metrics.record_orchestrator_pump_admission_delay(
2105 "trigger.inbox.envelopes",
2106 StdDuration::from_millis(50),
2107 );
2108 metrics.record_trigger_accepted_to_normalized(
2109 "github-new-issue",
2110 "github-new-issue@v7",
2111 "github",
2112 Some("tenant-a"),
2113 "normalized",
2114 StdDuration::from_millis(25),
2115 );
2116 metrics.record_trigger_accepted_to_queue_append(
2117 "github-new-issue",
2118 "github-new-issue@v7",
2119 "github",
2120 Some("tenant-a"),
2121 "queued",
2122 StdDuration::from_millis(40),
2123 );
2124 metrics.record_trigger_queue_age_at_dispatch_admission(
2125 "github-new-issue",
2126 "github-new-issue@v7",
2127 "github",
2128 Some("tenant-a"),
2129 "admitted",
2130 StdDuration::from_millis(75),
2131 );
2132 metrics.record_trigger_queue_age_at_dispatch_start(
2133 "github-new-issue",
2134 "github-new-issue@v7",
2135 "github",
2136 Some("tenant-a"),
2137 "started",
2138 StdDuration::from_millis(125),
2139 );
2140 metrics.record_trigger_dispatch_runtime(
2141 "github-new-issue",
2142 "github-new-issue@v7",
2143 "github",
2144 Some("tenant-a"),
2145 "succeeded",
2146 StdDuration::from_millis(250),
2147 );
2148 metrics.record_trigger_retry_delay(
2149 "github-new-issue",
2150 "github-new-issue@v7",
2151 "github",
2152 Some("tenant-a"),
2153 "scheduled",
2154 StdDuration::from_secs(2),
2155 );
2156 metrics.record_trigger_accepted_to_dlq(
2157 "github-new-issue",
2158 "github-new-issue@v7",
2159 "github",
2160 Some("tenant-a"),
2161 "retry_exhausted",
2162 StdDuration::from_secs(45),
2163 );
2164 metrics.record_backpressure_event("ingest", "reject");
2165 metrics.note_trigger_pending_event(
2166 "evt-1",
2167 "github-new-issue",
2168 "github-new-issue@v7",
2169 "github",
2170 Some("tenant-a"),
2171 1_000,
2172 4_000,
2173 );
2174 metrics.record_llm_call("mock", "mock", "succeeded", 0.01);
2175 metrics.record_llm_cache_hit("mock");
2176
2177 let rendered = metrics.render_prometheus();
2178 for needle in [
2179 "harn_http_requests_total{endpoint=\"/triggers/github\",method=\"POST\",status=\"200\"} 1",
2180 "harn_http_request_duration_seconds_bucket{endpoint=\"/triggers/github\",le=\"0.05\"} 1",
2181 "harn_http_body_size_bytes_bucket{endpoint=\"/triggers/github\",le=\"512\"} 1",
2182 "harn_trigger_received_total{provider=\"github\",trigger_id=\"github-new-issue\"} 1",
2183 "harn_trigger_deduped_total{reason=\"inbox_duplicate\",trigger_id=\"github-new-issue\"} 1",
2184 "harn_trigger_predicate_evaluations_total{result=\"true\",trigger_id=\"github-new-issue\"} 1",
2185 "harn_trigger_predicate_cost_usd_bucket{le=\"0.01\",trigger_id=\"github-new-issue\"} 1",
2186 "harn_trigger_dispatched_total{handler_kind=\"local\",outcome=\"succeeded\",trigger_id=\"github-new-issue\"} 1",
2187 "harn_trigger_retries_total{attempt=\"2\",trigger_id=\"github-new-issue\"} 1",
2188 "harn_trigger_dlq_total{reason=\"retry_exhausted\",trigger_id=\"github-new-issue\"} 1",
2189 "harn_trigger_inflight{trigger_id=\"github-new-issue\"} 0",
2190 "harn_event_log_append_duration_seconds_bucket{le=\"0.005\",topic=\"orchestrator.triggers.pending\"} 1",
2191 "harn_event_log_topic_size_bytes{topic=\"orchestrator.triggers.pending\"} 2048",
2192 "harn_event_log_consumer_lag{consumer=\"orchestrator-pump\",topic=\"orchestrator.triggers.pending\"} 0",
2193 "harn_trigger_budget_cost_today_usd{trigger_id=\"github-new-issue\"} 0.002",
2194 "harn_trigger_budget_exhausted_total{strategy=\"daily_budget_exceeded\",trigger_id=\"github-new-issue\"} 1",
2195 "harn_backpressure_events_total{action=\"reject\",dimension=\"ingest\"} 1",
2196 "harn_a2a_hops_total{outcome=\"succeeded\",target=\"agent.example\"} 1",
2197 "harn_a2a_hop_duration_seconds_bucket{le=\"0.01\",target=\"agent.example\"} 1",
2198 "harn_worker_queue_depth{queue=\"triage\"} 1",
2199 "harn_worker_queue_claim_age_seconds_bucket{le=\"5\",queue=\"triage\"} 1",
2200 "harn_orchestrator_pump_backlog{topic=\"trigger.inbox.envelopes\"} 2",
2201 "harn_orchestrator_pump_outstanding{topic=\"trigger.inbox.envelopes\"} 1",
2202 "harn_orchestrator_pump_admission_delay_seconds_bucket{le=\"0.05\",topic=\"trigger.inbox.envelopes\"} 1",
2203 "harn_trigger_webhook_accepted_to_normalized_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.025\",provider=\"github\",status=\"normalized\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2204 "harn_trigger_webhook_accepted_to_queue_append_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.05\",provider=\"github\",status=\"queued\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2205 "harn_trigger_queue_age_at_dispatch_admission_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.1\",provider=\"github\",status=\"admitted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2206 "harn_trigger_queue_age_at_dispatch_start_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"started\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2207 "harn_trigger_dispatch_runtime_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"succeeded\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2208 "harn_trigger_retry_delay_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"2.5\",provider=\"github\",status=\"scheduled\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2209 "harn_trigger_accepted_to_dlq_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"60\",provider=\"github\",status=\"retry_exhausted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2210 "harn_trigger_oldest_pending_age_seconds{binding_key=\"github-new-issue@v7\",provider=\"github\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 3",
2211 "harn_llm_calls_total{model=\"mock\",outcome=\"succeeded\",provider=\"mock\"} 1",
2212 "harn_llm_cost_usd_total{model=\"mock\",provider=\"mock\"} 0.01",
2213 "harn_llm_cache_hits_total{provider=\"mock\"} 1",
2214 ] {
2215 assert!(rendered.contains(needle), "missing {needle}\n{rendered}");
2216 }
2217 }
2218}