1use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex, OnceLock};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25 registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26 ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod a2a_push;
30pub mod cron;
31pub mod effect_policy;
32pub mod harn_module;
33pub mod hmac;
34pub mod shared;
35pub mod stream;
36#[cfg(test)]
37pub(crate) mod test_util;
38pub mod testkit;
39pub mod webhook;
40
41pub use a2a_push::A2aPushConnector;
42pub use cron::{CatchupMode, CronConnector};
43pub use effect_policy::{
44 connector_export_denied_builtin_reason, connector_export_effect_class,
45 default_connector_export_policy, ConnectorExportEffectClass, HarnConnectorEffectPolicies,
46};
47pub use harn_module::{
48 load_contract as load_harn_connector_contract, HarnConnector, HarnConnectorContract,
49};
50pub use hmac::{
51 verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
52 DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
53 DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
54 DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
55 DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
56 DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
57 SIGNATURE_VERIFY_AUDIT_TOPIC,
58};
59pub use shared::{
60 paginate_cursor, resolve_jwks, verify_hmac_signature, verify_jwt_claims, verify_jwt_json,
61 ConnectorBase, CursorPage, HmacSignatureAlgorithm, JwtKeySource, JwtVerificationOptions,
62};
63pub use stream::StreamConnector;
64use webhook::WebhookProviderProfile;
65pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
66
67const OUTBOUND_CONNECTOR_HTTP_TIMEOUT: StdDuration = StdDuration::from_secs(30);
68
69pub(crate) fn outbound_http_client(user_agent: &'static str) -> reqwest::Client {
70 reqwest::Client::builder()
71 .user_agent(user_agent)
72 .timeout(OUTBOUND_CONNECTOR_HTTP_TIMEOUT)
73 .redirect(reqwest::redirect::Policy::custom(|attempt| {
74 if attempt.previous().len() >= 10 {
75 attempt.error("too many redirects")
76 } else if crate::egress::redirect_url_allowed(
77 "connector_redirect",
78 attempt.url().as_str(),
79 ) {
80 attempt.follow()
81 } else {
82 attempt.error("egress policy blocked redirect target")
83 }
84 }))
85 .build()
86 .expect("connector HTTP client configuration should be valid")
87}
88
89pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
91
92thread_local! {
93 static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
94 RefCell::new(BTreeMap::new());
95}
96
97#[async_trait]
99pub trait Connector: Send + Sync {
100 fn provider_id(&self) -> &ProviderId;
102
103 fn kinds(&self) -> &[TriggerKind];
105
106 async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
108
109 async fn activate(
111 &self,
112 bindings: &[TriggerBinding],
113 ) -> Result<ActivationHandle, ConnectorError>;
114
115 async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
117 Ok(())
118 }
119
120 async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
122
123 async fn normalize_inbound_result(
126 &self,
127 raw: RawInbound,
128 ) -> Result<ConnectorNormalizeResult, ConnectorError> {
129 self.normalize_inbound(raw)
130 .await
131 .map(ConnectorNormalizeResult::event)
132 }
133
134 fn payload_schema(&self) -> ProviderPayloadSchema;
136
137 fn client(&self) -> Arc<dyn ConnectorClient>;
139}
140
141#[derive(Clone, Debug, PartialEq)]
143pub struct ConnectorHttpResponse {
144 pub status: u16,
145 pub headers: BTreeMap<String, String>,
146 pub body: JsonValue,
147}
148
149impl ConnectorHttpResponse {
150 pub fn new(status: u16, headers: BTreeMap<String, String>, body: JsonValue) -> Self {
151 Self {
152 status,
153 headers,
154 body,
155 }
156 }
157}
158
159#[derive(Clone, Debug, PartialEq)]
161pub enum ConnectorNormalizeResult {
162 Event(Box<TriggerEvent>),
163 Batch(Vec<TriggerEvent>),
164 ImmediateResponse {
165 response: ConnectorHttpResponse,
166 events: Vec<TriggerEvent>,
167 },
168 Reject(ConnectorHttpResponse),
169}
170
171impl ConnectorNormalizeResult {
172 pub fn event(event: TriggerEvent) -> Self {
173 Self::Event(Box::new(event))
174 }
175
176 pub fn into_events(self) -> Vec<TriggerEvent> {
177 match self {
178 Self::Event(event) => vec![*event],
179 Self::Batch(events) | Self::ImmediateResponse { events, .. } => events,
180 Self::Reject(_) => Vec::new(),
181 }
182 }
183}
184
185#[derive(Clone, Debug, PartialEq)]
186pub enum PostNormalizeOutcome {
187 Ready(Box<TriggerEvent>),
188 DuplicateDropped,
189}
190
191pub async fn postprocess_normalized_event(
192 inbox: &InboxIndex,
193 binding_id: &str,
194 dedupe_enabled: bool,
195 dedupe_ttl: StdDuration,
196 mut event: TriggerEvent,
197) -> Result<PostNormalizeOutcome, ConnectorError> {
198 if dedupe_enabled && !event.dedupe_claimed() {
199 if !inbox
200 .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
201 .await?
202 {
203 return Ok(PostNormalizeOutcome::DuplicateDropped);
204 }
205 event.mark_dedupe_claimed();
206 }
207
208 Ok(PostNormalizeOutcome::Ready(Box::new(event)))
209}
210
211#[async_trait]
213pub trait ConnectorClient: Send + Sync {
214 async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
215}
216
217#[derive(Clone, Debug, PartialEq, Eq)]
219pub enum ClientError {
220 MethodNotFound(String),
221 InvalidArgs(String),
222 RateLimited(String),
223 Transport(String),
224 EgressBlocked(crate::egress::EgressBlocked),
225 Other(String),
226}
227
228impl fmt::Display for ClientError {
229 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230 match self {
231 Self::MethodNotFound(message)
232 | Self::InvalidArgs(message)
233 | Self::RateLimited(message)
234 | Self::Transport(message)
235 | Self::Other(message) => message.fmt(f),
236 Self::EgressBlocked(blocked) => blocked.fmt(f),
237 }
238 }
239}
240
241impl std::error::Error for ClientError {}
242
243#[derive(Debug)]
245pub enum ConnectorError {
246 DuplicateProvider(String),
247 DuplicateDelivery(String),
248 UnknownProvider(String),
249 MissingHeader(String),
250 InvalidHeader {
251 name: String,
252 detail: String,
253 },
254 InvalidSignature(String),
255 TimestampOutOfWindow {
256 timestamp: OffsetDateTime,
257 now: OffsetDateTime,
258 window: time::Duration,
259 },
260 Json(String),
261 Secret(String),
262 EventLog(String),
263 HarnRuntime(String),
264 Client(ClientError),
265 Unsupported(String),
266 Activation(String),
267}
268
269impl ConnectorError {
270 pub fn invalid_signature(message: impl Into<String>) -> Self {
271 Self::InvalidSignature(message.into())
272 }
273}
274
275impl fmt::Display for ConnectorError {
276 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
277 match self {
278 Self::DuplicateProvider(provider) => {
279 write!(f, "connector provider `{provider}` is already registered")
280 }
281 Self::DuplicateDelivery(message) => message.fmt(f),
282 Self::UnknownProvider(provider) => {
283 write!(f, "connector provider `{provider}` is not registered")
284 }
285 Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
286 Self::InvalidHeader { name, detail } => {
287 write!(f, "invalid header `{name}`: {detail}")
288 }
289 Self::InvalidSignature(message)
290 | Self::Json(message)
291 | Self::Secret(message)
292 | Self::EventLog(message)
293 | Self::HarnRuntime(message)
294 | Self::Unsupported(message)
295 | Self::Activation(message) => message.fmt(f),
296 Self::TimestampOutOfWindow {
297 timestamp,
298 now,
299 window,
300 } => write!(
301 f,
302 "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
303 ),
304 Self::Client(error) => error.fmt(f),
305 }
306 }
307}
308
309impl std::error::Error for ConnectorError {}
310
311impl From<crate::event_log::LogError> for ConnectorError {
312 fn from(value: crate::event_log::LogError) -> Self {
313 Self::EventLog(value.to_string())
314 }
315}
316
317impl From<crate::secrets::SecretError> for ConnectorError {
318 fn from(value: crate::secrets::SecretError) -> Self {
319 Self::Secret(value.to_string())
320 }
321}
322
323impl From<serde_json::Error> for ConnectorError {
324 fn from(value: serde_json::Error) -> Self {
325 Self::Json(value.to_string())
326 }
327}
328
329impl From<ClientError> for ConnectorError {
330 fn from(value: ClientError) -> Self {
331 Self::Client(value)
332 }
333}
334
335#[derive(Clone)]
337pub struct ConnectorCtx {
338 pub event_log: Arc<AnyEventLog>,
339 pub secrets: Arc<dyn SecretProvider>,
340 pub inbox: Arc<InboxIndex>,
341 pub metrics: Arc<MetricsRegistry>,
342 pub rate_limiter: Arc<RateLimiterFactory>,
343}
344
345#[derive(Clone, Debug, Default, PartialEq, Eq)]
347pub struct ConnectorMetricsSnapshot {
348 pub inbox_claims_written: u64,
349 pub inbox_duplicates_rejected: u64,
350 pub inbox_fast_path_hits: u64,
351 pub inbox_durable_hits: u64,
352 pub inbox_expired_entries: u64,
353 pub inbox_active_entries: u64,
354 pub linear_timestamp_rejections_total: u64,
355 pub dispatch_succeeded_total: u64,
356 pub dispatch_failed_total: u64,
357 pub retry_scheduled_total: u64,
358 pub slack_delivery_success_total: u64,
359 pub slack_delivery_failure_total: u64,
360}
361
362type MetricLabels = BTreeMap<String, String>;
363
364#[derive(Clone, Debug, Default, PartialEq)]
365struct HistogramMetric {
366 buckets: BTreeMap<String, u64>,
367 count: u64,
368 sum: f64,
369}
370
371static ACTIVE_METRICS_REGISTRY: OnceLock<Mutex<Option<Arc<MetricsRegistry>>>> = OnceLock::new();
372
373pub fn install_active_metrics_registry(metrics: Arc<MetricsRegistry>) {
374 let slot = ACTIVE_METRICS_REGISTRY.get_or_init(|| Mutex::new(None));
375 *slot.lock().expect("active metrics registry poisoned") = Some(metrics);
376}
377
378pub fn clear_active_metrics_registry() {
379 if let Some(slot) = ACTIVE_METRICS_REGISTRY.get() {
380 *slot.lock().expect("active metrics registry poisoned") = None;
381 }
382}
383
384pub fn active_metrics_registry() -> Option<Arc<MetricsRegistry>> {
385 ACTIVE_METRICS_REGISTRY.get().and_then(|slot| {
386 slot.lock()
387 .expect("active metrics registry poisoned")
388 .clone()
389 })
390}
391
392#[derive(Debug, Default)]
394pub struct MetricsRegistry {
395 inbox_claims_written: AtomicU64,
396 inbox_duplicates_rejected: AtomicU64,
397 inbox_fast_path_hits: AtomicU64,
398 inbox_durable_hits: AtomicU64,
399 inbox_expired_entries: AtomicU64,
400 inbox_active_entries: AtomicU64,
401 linear_timestamp_rejections_total: AtomicU64,
402 dispatch_succeeded_total: AtomicU64,
403 dispatch_failed_total: AtomicU64,
404 retry_scheduled_total: AtomicU64,
405 slack_delivery_success_total: AtomicU64,
406 slack_delivery_failure_total: AtomicU64,
407 custom_counters: Mutex<BTreeMap<String, u64>>,
408 counters: Mutex<BTreeMap<(String, MetricLabels), f64>>,
409 gauges: Mutex<BTreeMap<(String, MetricLabels), f64>>,
410 histograms: Mutex<BTreeMap<(String, MetricLabels), HistogramMetric>>,
411 pending_trigger_events: Mutex<BTreeMap<MetricLabels, BTreeMap<String, i64>>>,
412}
413
414impl MetricsRegistry {
415 const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0];
416 const TRIGGER_LATENCY_BUCKETS: [f64; 15] = [
417 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0,
418 ];
419 const SIZE_BUCKETS: [f64; 9] = [
420 128.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0, 10485760.0,
421 ];
422
423 pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
424 ConnectorMetricsSnapshot {
425 inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
426 inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
427 inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
428 inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
429 inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
430 inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
431 linear_timestamp_rejections_total: self
432 .linear_timestamp_rejections_total
433 .load(Ordering::Relaxed),
434 dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
435 dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
436 retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
437 slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
438 slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
439 }
440 }
441
442 pub(crate) fn record_inbox_claim(&self) {
443 self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
444 }
445
446 pub(crate) fn record_inbox_duplicate_fast_path(&self) {
447 self.inbox_duplicates_rejected
448 .fetch_add(1, Ordering::Relaxed);
449 self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
450 }
451
452 pub(crate) fn record_inbox_duplicate_durable(&self) {
453 self.inbox_duplicates_rejected
454 .fetch_add(1, Ordering::Relaxed);
455 self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
456 }
457
458 pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
459 if count > 0 {
460 self.inbox_expired_entries
461 .fetch_add(count, Ordering::Relaxed);
462 }
463 }
464
465 pub(crate) fn set_inbox_active_entries(&self, count: usize) {
466 self.inbox_active_entries
467 .store(count as u64, Ordering::Relaxed);
468 }
469
470 pub fn record_linear_timestamp_rejection(&self) {
471 self.linear_timestamp_rejections_total
472 .fetch_add(1, Ordering::Relaxed);
473 }
474
475 pub fn record_dispatch_succeeded(&self) {
476 self.dispatch_succeeded_total
477 .fetch_add(1, Ordering::Relaxed);
478 }
479
480 pub fn record_dispatch_failed(&self) {
481 self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
482 }
483
484 pub fn record_retry_scheduled(&self) {
485 self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
486 }
487
488 pub fn record_slack_delivery_success(&self) {
489 self.slack_delivery_success_total
490 .fetch_add(1, Ordering::Relaxed);
491 }
492
493 pub fn record_slack_delivery_failure(&self) {
494 self.slack_delivery_failure_total
495 .fetch_add(1, Ordering::Relaxed);
496 }
497
498 pub fn record_custom_counter(&self, name: &str, amount: u64) {
499 if amount == 0 {
500 return;
501 }
502 let mut counters = self
503 .custom_counters
504 .lock()
505 .expect("custom counters poisoned");
506 *counters.entry(name.to_string()).or_default() += amount;
507 }
508
509 pub fn record_http_request(
510 &self,
511 endpoint: &str,
512 method: &str,
513 status: u16,
514 duration: StdDuration,
515 body_size_bytes: usize,
516 ) {
517 self.increment_counter(
518 "harn_http_requests_total",
519 labels([
520 ("endpoint", endpoint),
521 ("method", method),
522 ("status", &status.to_string()),
523 ]),
524 1,
525 );
526 self.observe_histogram(
527 "harn_http_request_duration_seconds",
528 labels([("endpoint", endpoint)]),
529 duration.as_secs_f64(),
530 &Self::DURATION_BUCKETS,
531 );
532 self.observe_histogram(
533 "harn_http_body_size_bytes",
534 labels([("endpoint", endpoint)]),
535 body_size_bytes as f64,
536 &Self::SIZE_BUCKETS,
537 );
538 }
539
540 pub fn record_trigger_received(&self, trigger_id: &str, provider: &str) {
541 self.increment_counter(
542 "harn_trigger_received_total",
543 labels([("trigger_id", trigger_id), ("provider", provider)]),
544 1,
545 );
546 }
547
548 pub fn record_trigger_deduped(&self, trigger_id: &str, reason: &str) {
549 self.increment_counter(
550 "harn_trigger_deduped_total",
551 labels([("trigger_id", trigger_id), ("reason", reason)]),
552 1,
553 );
554 }
555
556 pub fn record_trigger_predicate_evaluation(
557 &self,
558 trigger_id: &str,
559 result: bool,
560 cost_usd: f64,
561 ) {
562 self.increment_counter(
563 "harn_trigger_predicate_evaluations_total",
564 labels([
565 ("trigger_id", trigger_id),
566 ("result", if result { "true" } else { "false" }),
567 ]),
568 1,
569 );
570 self.observe_histogram(
571 "harn_trigger_predicate_cost_usd",
572 labels([("trigger_id", trigger_id)]),
573 cost_usd.max(0.0),
574 &[0.0, 0.001, 0.01, 0.05, 0.1, 1.0],
575 );
576 }
577
578 pub fn record_trigger_dispatched(&self, trigger_id: &str, handler_kind: &str, outcome: &str) {
579 self.increment_counter(
580 "harn_trigger_dispatched_total",
581 labels([
582 ("trigger_id", trigger_id),
583 ("handler_kind", handler_kind),
584 ("outcome", outcome),
585 ]),
586 1,
587 );
588 }
589
590 pub fn record_trigger_retry(&self, trigger_id: &str, attempt: u32) {
591 self.increment_counter(
592 "harn_trigger_retries_total",
593 labels([
594 ("trigger_id", trigger_id),
595 ("attempt", &attempt.to_string()),
596 ]),
597 1,
598 );
599 }
600
601 pub fn record_trigger_dlq(&self, trigger_id: &str, reason: &str) {
602 self.increment_counter(
603 "harn_trigger_dlq_total",
604 labels([("trigger_id", trigger_id), ("reason", reason)]),
605 1,
606 );
607 }
608
609 pub fn record_trigger_accepted_to_normalized(
610 &self,
611 trigger_id: &str,
612 binding_key: &str,
613 provider: &str,
614 tenant_id: Option<&str>,
615 status: &str,
616 duration: StdDuration,
617 ) {
618 self.observe_histogram(
619 "harn_trigger_webhook_accepted_to_normalized_seconds",
620 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
621 duration.as_secs_f64(),
622 &Self::TRIGGER_LATENCY_BUCKETS,
623 );
624 }
625
626 pub fn record_trigger_accepted_to_queue_append(
627 &self,
628 trigger_id: &str,
629 binding_key: &str,
630 provider: &str,
631 tenant_id: Option<&str>,
632 status: &str,
633 duration: StdDuration,
634 ) {
635 self.observe_histogram(
636 "harn_trigger_webhook_accepted_to_queue_append_seconds",
637 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
638 duration.as_secs_f64(),
639 &Self::TRIGGER_LATENCY_BUCKETS,
640 );
641 }
642
643 pub fn record_trigger_queue_age_at_dispatch_admission(
644 &self,
645 trigger_id: &str,
646 binding_key: &str,
647 provider: &str,
648 tenant_id: Option<&str>,
649 status: &str,
650 age: StdDuration,
651 ) {
652 self.observe_histogram(
653 "harn_trigger_queue_age_at_dispatch_admission_seconds",
654 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
655 age.as_secs_f64(),
656 &Self::TRIGGER_LATENCY_BUCKETS,
657 );
658 }
659
660 pub fn record_trigger_queue_age_at_dispatch_start(
661 &self,
662 trigger_id: &str,
663 binding_key: &str,
664 provider: &str,
665 tenant_id: Option<&str>,
666 status: &str,
667 age: StdDuration,
668 ) {
669 self.observe_histogram(
670 "harn_trigger_queue_age_at_dispatch_start_seconds",
671 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
672 age.as_secs_f64(),
673 &Self::TRIGGER_LATENCY_BUCKETS,
674 );
675 }
676
677 pub fn record_trigger_dispatch_runtime(
678 &self,
679 trigger_id: &str,
680 binding_key: &str,
681 provider: &str,
682 tenant_id: Option<&str>,
683 status: &str,
684 duration: StdDuration,
685 ) {
686 self.observe_histogram(
687 "harn_trigger_dispatch_runtime_seconds",
688 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
689 duration.as_secs_f64(),
690 &Self::TRIGGER_LATENCY_BUCKETS,
691 );
692 }
693
694 pub fn record_trigger_retry_delay(
695 &self,
696 trigger_id: &str,
697 binding_key: &str,
698 provider: &str,
699 tenant_id: Option<&str>,
700 status: &str,
701 duration: StdDuration,
702 ) {
703 self.observe_histogram(
704 "harn_trigger_retry_delay_seconds",
705 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
706 duration.as_secs_f64(),
707 &Self::TRIGGER_LATENCY_BUCKETS,
708 );
709 }
710
711 pub fn record_trigger_accepted_to_dlq(
712 &self,
713 trigger_id: &str,
714 binding_key: &str,
715 provider: &str,
716 tenant_id: Option<&str>,
717 status: &str,
718 duration: StdDuration,
719 ) {
720 self.observe_histogram(
721 "harn_trigger_accepted_to_dlq_seconds",
722 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
723 duration.as_secs_f64(),
724 &Self::TRIGGER_LATENCY_BUCKETS,
725 );
726 }
727
728 pub fn note_trigger_pending_event(
729 &self,
730 event_id: &str,
731 trigger_id: &str,
732 binding_key: &str,
733 provider: &str,
734 tenant_id: Option<&str>,
735 accepted_at_ms: i64,
736 now_ms: i64,
737 ) {
738 let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
739 {
740 let mut pending = self
741 .pending_trigger_events
742 .lock()
743 .expect("pending trigger events poisoned");
744 pending
745 .entry(labels.clone())
746 .or_default()
747 .insert(event_id.to_string(), accepted_at_ms);
748 }
749 self.refresh_oldest_pending_gauge(labels, now_ms);
750 }
751
752 pub fn clear_trigger_pending_event(
753 &self,
754 event_id: &str,
755 trigger_id: &str,
756 binding_key: &str,
757 provider: &str,
758 tenant_id: Option<&str>,
759 now_ms: i64,
760 ) {
761 let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
762 {
763 let mut pending = self
764 .pending_trigger_events
765 .lock()
766 .expect("pending trigger events poisoned");
767 if let Some(events) = pending.get_mut(&labels) {
768 events.remove(event_id);
769 if events.is_empty() {
770 pending.remove(&labels);
771 }
772 }
773 }
774 self.refresh_oldest_pending_gauge(labels, now_ms);
775 }
776
777 pub fn set_trigger_inflight(&self, trigger_id: &str, count: u64) {
778 self.set_gauge(
779 "harn_trigger_inflight",
780 labels([("trigger_id", trigger_id)]),
781 count as f64,
782 );
783 }
784
785 pub fn set_trigger_budget_cost_today(&self, trigger_id: &str, cost_usd: f64) {
786 self.set_gauge(
787 "harn_trigger_budget_cost_today_usd",
788 labels([("trigger_id", trigger_id)]),
789 cost_usd.max(0.0),
790 );
791 }
792
793 pub fn record_trigger_budget_exhausted(&self, trigger_id: &str, strategy: &str) {
794 self.increment_counter(
795 "harn_trigger_budget_exhausted_total",
796 labels([("trigger_id", trigger_id), ("strategy", strategy)]),
797 1,
798 );
799 }
800
801 pub fn record_backpressure_event(&self, dimension: &str, action: &str) {
802 self.increment_counter(
803 "harn_backpressure_events_total",
804 labels([("dimension", dimension), ("action", action)]),
805 1,
806 );
807 }
808
809 pub fn record_event_log_append(
810 &self,
811 topic: &str,
812 duration: StdDuration,
813 payload_bytes: usize,
814 ) {
815 self.observe_histogram(
816 "harn_event_log_append_duration_seconds",
817 labels([("topic", topic)]),
818 duration.as_secs_f64(),
819 &Self::DURATION_BUCKETS,
820 );
821 self.set_gauge(
822 "harn_event_log_topic_size_bytes",
823 labels([("topic", topic)]),
824 payload_bytes as f64,
825 );
826 }
827
828 pub fn set_event_log_consumer_lag(&self, topic: &str, consumer: &str, lag: u64) {
829 self.set_gauge(
830 "harn_event_log_consumer_lag",
831 labels([("topic", topic), ("consumer", consumer)]),
832 lag as f64,
833 );
834 }
835
836 pub fn record_a2a_hop(&self, target: &str, outcome: &str, duration: StdDuration) {
837 self.increment_counter(
838 "harn_a2a_hops_total",
839 labels([("target", target), ("outcome", outcome)]),
840 1,
841 );
842 self.observe_histogram(
843 "harn_a2a_hop_duration_seconds",
844 labels([("target", target)]),
845 duration.as_secs_f64(),
846 &Self::DURATION_BUCKETS,
847 );
848 }
849
850 pub fn set_worker_queue_depth(&self, queue: &str, depth: u64) {
851 self.set_gauge(
852 "harn_worker_queue_depth",
853 labels([("queue", queue)]),
854 depth as f64,
855 );
856 }
857
858 pub fn record_worker_queue_claim_age(&self, queue: &str, age_seconds: f64) {
859 self.observe_histogram(
860 "harn_worker_queue_claim_age_seconds",
861 labels([("queue", queue)]),
862 age_seconds.max(0.0),
863 &Self::DURATION_BUCKETS,
864 );
865 }
866
867 pub fn record_scheduler_selection(
869 &self,
870 queue: &str,
871 fairness_dimension: &str,
872 fairness_key: &str,
873 ) {
874 self.increment_counter(
875 "harn_scheduler_selections_total",
876 labels([
877 ("queue", queue),
878 ("fairness_dimension", fairness_dimension),
879 ("fairness_key", fairness_key),
880 ]),
881 1,
882 );
883 }
884
885 pub fn record_scheduler_deferral(
888 &self,
889 queue: &str,
890 fairness_dimension: &str,
891 fairness_key: &str,
892 ) {
893 self.increment_counter(
894 "harn_scheduler_deferrals_total",
895 labels([
896 ("queue", queue),
897 ("fairness_dimension", fairness_dimension),
898 ("fairness_key", fairness_key),
899 ]),
900 1,
901 );
902 }
903
904 pub fn record_scheduler_starvation_promotion(
906 &self,
907 queue: &str,
908 fairness_dimension: &str,
909 fairness_key: &str,
910 ) {
911 self.increment_counter(
912 "harn_scheduler_starvation_promotions_total",
913 labels([
914 ("queue", queue),
915 ("fairness_dimension", fairness_dimension),
916 ("fairness_key", fairness_key),
917 ]),
918 1,
919 );
920 }
921
922 pub fn set_scheduler_deficit(
924 &self,
925 queue: &str,
926 fairness_dimension: &str,
927 fairness_key: &str,
928 deficit: i64,
929 ) {
930 self.set_gauge(
931 "harn_scheduler_deficit",
932 labels([
933 ("queue", queue),
934 ("fairness_dimension", fairness_dimension),
935 ("fairness_key", fairness_key),
936 ]),
937 deficit as f64,
938 );
939 }
940
941 pub fn set_scheduler_oldest_eligible_age(
943 &self,
944 queue: &str,
945 fairness_dimension: &str,
946 fairness_key: &str,
947 age_ms: u64,
948 ) {
949 self.set_gauge(
950 "harn_scheduler_oldest_eligible_age_seconds",
951 labels([
952 ("queue", queue),
953 ("fairness_dimension", fairness_dimension),
954 ("fairness_key", fairness_key),
955 ]),
956 age_ms as f64 / 1000.0,
957 );
958 }
959
960 pub fn set_orchestrator_pump_backlog(&self, topic: &str, count: u64) {
961 self.set_gauge(
962 "harn_orchestrator_pump_backlog",
963 labels([("topic", topic)]),
964 count as f64,
965 );
966 }
967
968 pub fn set_orchestrator_pump_outstanding(&self, topic: &str, count: usize) {
969 self.set_gauge(
970 "harn_orchestrator_pump_outstanding",
971 labels([("topic", topic)]),
972 count as f64,
973 );
974 }
975
976 pub fn record_orchestrator_pump_admission_delay(&self, topic: &str, duration: StdDuration) {
977 self.observe_histogram(
978 "harn_orchestrator_pump_admission_delay_seconds",
979 labels([("topic", topic)]),
980 duration.as_secs_f64(),
981 &Self::DURATION_BUCKETS,
982 );
983 }
984
985 pub fn record_llm_call(&self, provider: &str, model: &str, outcome: &str, cost_usd: f64) {
986 self.increment_counter(
987 "harn_llm_calls_total",
988 labels([
989 ("provider", provider),
990 ("model", model),
991 ("outcome", outcome),
992 ]),
993 1,
994 );
995 if cost_usd > 0.0 {
996 self.increment_counter(
997 "harn_llm_cost_usd_total",
998 labels([("provider", provider), ("model", model)]),
999 cost_usd,
1000 );
1001 } else {
1002 self.ensure_counter(
1003 "harn_llm_cost_usd_total",
1004 labels([("provider", provider), ("model", model)]),
1005 );
1006 }
1007 }
1008
1009 pub fn record_llm_cache_hit(&self, provider: &str) {
1010 self.increment_counter(
1011 "harn_llm_cache_hits_total",
1012 labels([("provider", provider)]),
1013 1,
1014 );
1015 }
1016
1017 pub fn render_prometheus(&self) -> String {
1018 let snapshot = self.snapshot();
1019 let counters = [
1020 (
1021 "connector_linear_timestamp_rejections_total",
1022 snapshot.linear_timestamp_rejections_total,
1023 ),
1024 (
1025 "dispatch_succeeded_total",
1026 snapshot.dispatch_succeeded_total,
1027 ),
1028 ("dispatch_failed_total", snapshot.dispatch_failed_total),
1029 ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
1030 ("retry_scheduled_total", snapshot.retry_scheduled_total),
1031 (
1032 "slack_events_delivery_success_total",
1033 snapshot.slack_delivery_success_total,
1034 ),
1035 (
1036 "slack_events_delivery_failure_total",
1037 snapshot.slack_delivery_failure_total,
1038 ),
1039 ];
1040
1041 let mut rendered = String::new();
1042 for (name, value) in counters {
1043 rendered.push_str("# TYPE ");
1044 rendered.push_str(name);
1045 rendered.push_str(" counter\n");
1046 rendered.push_str(name);
1047 rendered.push(' ');
1048 rendered.push_str(&value.to_string());
1049 rendered.push('\n');
1050 }
1051 let custom_counters = self
1052 .custom_counters
1053 .lock()
1054 .expect("custom counters poisoned");
1055 for (name, value) in custom_counters.iter() {
1056 let metric_name = format!(
1057 "connector_custom_{}_total",
1058 name.chars()
1059 .map(|ch| if ch.is_ascii_alphanumeric() || ch == '_' {
1060 ch
1061 } else {
1062 '_'
1063 })
1064 .collect::<String>()
1065 );
1066 rendered.push_str("# TYPE ");
1067 rendered.push_str(&metric_name);
1068 rendered.push_str(" counter\n");
1069 rendered.push_str(&metric_name);
1070 rendered.push(' ');
1071 rendered.push_str(&value.to_string());
1072 rendered.push('\n');
1073 }
1074 rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
1075 rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
1076 rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
1077 rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
1078 self.render_generic_metrics(&mut rendered);
1079 rendered
1080 }
1081
1082 fn increment_counter(&self, name: &str, labels: MetricLabels, amount: impl Into<f64>) {
1083 let amount = amount.into();
1084 if amount <= 0.0 || !amount.is_finite() {
1085 return;
1086 }
1087 let mut counters = self.counters.lock().expect("metrics counters poisoned");
1088 *counters.entry((name.to_string(), labels)).or_default() += amount;
1089 }
1090
1091 fn ensure_counter(&self, name: &str, labels: MetricLabels) {
1092 let mut counters = self.counters.lock().expect("metrics counters poisoned");
1093 counters.entry((name.to_string(), labels)).or_default();
1094 }
1095
1096 fn set_gauge(&self, name: &str, labels: MetricLabels, value: f64) {
1097 let mut gauges = self.gauges.lock().expect("metrics gauges poisoned");
1098 gauges.insert((name.to_string(), labels), value);
1099 }
1100
1101 fn observe_histogram(
1102 &self,
1103 name: &str,
1104 labels: MetricLabels,
1105 value: f64,
1106 bucket_bounds: &[f64],
1107 ) {
1108 if !value.is_finite() {
1109 return;
1110 }
1111 let mut histograms = self.histograms.lock().expect("metrics histograms poisoned");
1112 let histogram = histograms
1113 .entry((name.to_string(), labels))
1114 .or_insert_with(|| HistogramMetric {
1115 buckets: bucket_bounds
1116 .iter()
1117 .map(|bound| (prometheus_float(*bound), 0))
1118 .chain(std::iter::once(("+Inf".to_string(), 0)))
1119 .collect(),
1120 count: 0,
1121 sum: 0.0,
1122 });
1123 histogram.count += 1;
1124 histogram.sum += value;
1125 for bound in bucket_bounds {
1126 if value <= *bound {
1127 let key = prometheus_float(*bound);
1128 *histogram.buckets.entry(key).or_default() += 1;
1129 }
1130 }
1131 *histogram.buckets.entry("+Inf".to_string()).or_default() += 1;
1132 }
1133
1134 fn refresh_oldest_pending_gauge(&self, labels: MetricLabels, now_ms: i64) {
1135 let oldest_accepted_at_ms = self
1136 .pending_trigger_events
1137 .lock()
1138 .expect("pending trigger events poisoned")
1139 .get(&labels)
1140 .and_then(|events| events.values().min().copied());
1141 let age_seconds = oldest_accepted_at_ms
1142 .map(|accepted_at_ms| millis_delta(now_ms, accepted_at_ms).as_secs_f64())
1143 .unwrap_or(0.0);
1144 self.set_gauge(
1145 "harn_trigger_oldest_pending_age_seconds",
1146 labels,
1147 age_seconds,
1148 );
1149 }
1150
1151 fn render_generic_metrics(&self, rendered: &mut String) {
1152 let counters = self
1153 .counters
1154 .lock()
1155 .expect("metrics counters poisoned")
1156 .clone();
1157 let gauges = self.gauges.lock().expect("metrics gauges poisoned").clone();
1158 let histograms = self
1159 .histograms
1160 .lock()
1161 .expect("metrics histograms poisoned")
1162 .clone();
1163
1164 for name in metric_family_names(MetricKind::Counter) {
1165 rendered.push_str("# TYPE ");
1166 rendered.push_str(name);
1167 rendered.push_str(" counter\n");
1168 for ((sample_name, labels), value) in counters.iter().filter(|((n, _), _)| n == name) {
1169 render_sample(rendered, sample_name, labels, *value);
1170 }
1171 }
1172 for name in metric_family_names(MetricKind::Gauge) {
1173 rendered.push_str("# TYPE ");
1174 rendered.push_str(name);
1175 rendered.push_str(" gauge\n");
1176 for ((sample_name, labels), value) in gauges.iter().filter(|((n, _), _)| n == name) {
1177 render_sample(rendered, sample_name, labels, *value);
1178 }
1179 }
1180 for name in metric_family_names(MetricKind::Histogram) {
1181 rendered.push_str("# TYPE ");
1182 rendered.push_str(name);
1183 rendered.push_str(" histogram\n");
1184 for ((sample_name, labels), histogram) in
1185 histograms.iter().filter(|((n, _), _)| n == name)
1186 {
1187 for (le, value) in &histogram.buckets {
1188 let mut bucket_labels = labels.clone();
1189 bucket_labels.insert("le".to_string(), le.clone());
1190 render_sample(
1191 rendered,
1192 &format!("{sample_name}_bucket"),
1193 &bucket_labels,
1194 *value as f64,
1195 );
1196 }
1197 render_sample(
1198 rendered,
1199 &format!("{sample_name}_sum"),
1200 labels,
1201 histogram.sum,
1202 );
1203 render_sample(
1204 rendered,
1205 &format!("{sample_name}_count"),
1206 labels,
1207 histogram.count as f64,
1208 );
1209 }
1210 }
1211 }
1212}
1213
1214#[derive(Clone, Copy)]
1215enum MetricKind {
1216 Counter,
1217 Gauge,
1218 Histogram,
1219}
1220
1221fn metric_family_names(kind: MetricKind) -> &'static [&'static str] {
1222 match kind {
1223 MetricKind::Counter => &[
1224 "harn_http_requests_total",
1225 "harn_trigger_received_total",
1226 "harn_trigger_deduped_total",
1227 "harn_trigger_predicate_evaluations_total",
1228 "harn_trigger_dispatched_total",
1229 "harn_trigger_retries_total",
1230 "harn_trigger_dlq_total",
1231 "harn_trigger_budget_exhausted_total",
1232 "harn_backpressure_events_total",
1233 "harn_a2a_hops_total",
1234 "harn_llm_calls_total",
1235 "harn_llm_cost_usd_total",
1236 "harn_llm_cache_hits_total",
1237 "harn_scheduler_selections_total",
1238 "harn_scheduler_deferrals_total",
1239 "harn_scheduler_starvation_promotions_total",
1240 ],
1241 MetricKind::Gauge => &[
1242 "harn_trigger_inflight",
1243 "harn_event_log_topic_size_bytes",
1244 "harn_event_log_consumer_lag",
1245 "harn_trigger_budget_cost_today_usd",
1246 "harn_worker_queue_depth",
1247 "harn_orchestrator_pump_backlog",
1248 "harn_orchestrator_pump_outstanding",
1249 "harn_trigger_oldest_pending_age_seconds",
1250 "harn_scheduler_deficit",
1251 "harn_scheduler_oldest_eligible_age_seconds",
1252 ],
1253 MetricKind::Histogram => &[
1254 "harn_http_request_duration_seconds",
1255 "harn_http_body_size_bytes",
1256 "harn_trigger_predicate_cost_usd",
1257 "harn_event_log_append_duration_seconds",
1258 "harn_a2a_hop_duration_seconds",
1259 "harn_worker_queue_claim_age_seconds",
1260 "harn_orchestrator_pump_admission_delay_seconds",
1261 "harn_trigger_webhook_accepted_to_normalized_seconds",
1262 "harn_trigger_webhook_accepted_to_queue_append_seconds",
1263 "harn_trigger_queue_age_at_dispatch_admission_seconds",
1264 "harn_trigger_queue_age_at_dispatch_start_seconds",
1265 "harn_trigger_dispatch_runtime_seconds",
1266 "harn_trigger_retry_delay_seconds",
1267 "harn_trigger_accepted_to_dlq_seconds",
1268 ],
1269 }
1270}
1271
1272fn labels<const N: usize>(pairs: [(&str, &str); N]) -> MetricLabels {
1273 pairs
1274 .into_iter()
1275 .map(|(name, value)| (name.to_string(), value.to_string()))
1276 .collect()
1277}
1278
1279fn trigger_lifecycle_labels(
1280 trigger_id: &str,
1281 binding_key: &str,
1282 provider: &str,
1283 tenant_id: Option<&str>,
1284 status: &str,
1285) -> MetricLabels {
1286 labels([
1287 ("binding_key", binding_key),
1288 ("provider", provider),
1289 ("status", status),
1290 ("tenant_id", tenant_label(tenant_id)),
1291 ("trigger_id", trigger_id),
1292 ])
1293}
1294
1295fn trigger_pending_labels(
1296 trigger_id: &str,
1297 binding_key: &str,
1298 provider: &str,
1299 tenant_id: Option<&str>,
1300) -> MetricLabels {
1301 labels([
1302 ("binding_key", binding_key),
1303 ("provider", provider),
1304 ("tenant_id", tenant_label(tenant_id)),
1305 ("trigger_id", trigger_id),
1306 ])
1307}
1308
1309fn tenant_label(tenant_id: Option<&str>) -> &str {
1310 tenant_id
1311 .map(str::trim)
1312 .filter(|value| !value.is_empty())
1313 .unwrap_or("none")
1314}
1315
1316fn millis_delta(later_ms: i64, earlier_ms: i64) -> StdDuration {
1317 StdDuration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
1318}
1319
1320fn render_sample(rendered: &mut String, name: &str, labels: &MetricLabels, value: f64) {
1321 rendered.push_str(name);
1322 if !labels.is_empty() {
1323 rendered.push('{');
1324 for (index, (label, label_value)) in labels.iter().enumerate() {
1325 if index > 0 {
1326 rendered.push(',');
1327 }
1328 rendered.push_str(label);
1329 rendered.push_str("=\"");
1330 rendered.push_str(&escape_label_value(label_value));
1331 rendered.push('"');
1332 }
1333 rendered.push('}');
1334 }
1335 rendered.push(' ');
1336 rendered.push_str(&prometheus_float(value));
1337 rendered.push('\n');
1338}
1339
1340fn escape_label_value(value: &str) -> String {
1341 value
1342 .chars()
1343 .flat_map(|ch| match ch {
1344 '\\' => "\\\\".chars().collect::<Vec<_>>(),
1345 '"' => "\\\"".chars().collect::<Vec<_>>(),
1346 '\n' => "\\n".chars().collect::<Vec<_>>(),
1347 other => vec![other],
1348 })
1349 .collect()
1350}
1351
1352fn prometheus_float(value: f64) -> String {
1353 if value.is_infinite() && value.is_sign_positive() {
1354 return "+Inf".to_string();
1355 }
1356 if value.fract() == 0.0 {
1357 format!("{value:.0}")
1358 } else {
1359 let rendered = format!("{value:.6}");
1360 rendered
1361 .trim_end_matches('0')
1362 .trim_end_matches('.')
1363 .to_string()
1364 }
1365}
1366
1367#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1369pub struct ProviderPayloadSchema {
1370 pub harn_schema_name: String,
1371 #[serde(default)]
1372 pub json_schema: JsonValue,
1373}
1374
1375impl ProviderPayloadSchema {
1376 pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
1377 Self {
1378 harn_schema_name: harn_schema_name.into(),
1379 json_schema,
1380 }
1381 }
1382
1383 pub fn named(harn_schema_name: impl Into<String>) -> Self {
1384 Self::new(harn_schema_name, JsonValue::Null)
1385 }
1386}
1387
1388impl Default for ProviderPayloadSchema {
1389 fn default() -> Self {
1390 Self::named("raw")
1391 }
1392}
1393
1394#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1396#[serde(transparent)]
1397pub struct TriggerKind(String);
1398
1399impl TriggerKind {
1400 pub fn new(value: impl Into<String>) -> Self {
1401 Self(value.into())
1402 }
1403
1404 pub fn as_str(&self) -> &str {
1405 self.0.as_str()
1406 }
1407}
1408
1409impl From<&str> for TriggerKind {
1410 fn from(value: &str) -> Self {
1411 Self::new(value)
1412 }
1413}
1414
1415impl From<String> for TriggerKind {
1416 fn from(value: String) -> Self {
1417 Self::new(value)
1418 }
1419}
1420
1421#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1423pub struct TriggerBinding {
1424 pub provider: ProviderId,
1425 pub kind: TriggerKind,
1426 pub binding_id: String,
1427 #[serde(default)]
1428 pub dedupe_key: Option<String>,
1429 #[serde(default = "default_dedupe_retention_days")]
1430 pub dedupe_retention_days: u32,
1431 #[serde(default)]
1432 pub config: JsonValue,
1433}
1434
1435impl TriggerBinding {
1436 pub fn new(
1437 provider: ProviderId,
1438 kind: impl Into<TriggerKind>,
1439 binding_id: impl Into<String>,
1440 ) -> Self {
1441 Self {
1442 provider,
1443 kind: kind.into(),
1444 binding_id: binding_id.into(),
1445 dedupe_key: None,
1446 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1447 config: JsonValue::Null,
1448 }
1449 }
1450}
1451
1452fn default_dedupe_retention_days() -> u32 {
1453 crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
1454}
1455
1456#[derive(Clone, Debug, Default)]
1458pub struct TriggerRegistry {
1459 bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
1460}
1461
1462impl TriggerRegistry {
1463 pub fn register(&mut self, binding: TriggerBinding) {
1464 self.bindings
1465 .entry(binding.provider.clone())
1466 .or_default()
1467 .push(binding);
1468 }
1469
1470 pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
1471 &self.bindings
1472 }
1473
1474 pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
1475 self.bindings
1476 .get(provider)
1477 .map(Vec::as_slice)
1478 .unwrap_or(&[])
1479 }
1480}
1481
1482#[derive(Clone, Debug, PartialEq, Eq)]
1484pub struct ActivationHandle {
1485 pub provider: ProviderId,
1486 pub binding_count: usize,
1487}
1488
1489impl ActivationHandle {
1490 pub fn new(provider: ProviderId, binding_count: usize) -> Self {
1491 Self {
1492 provider,
1493 binding_count,
1494 }
1495 }
1496}
1497
1498#[derive(Clone, Debug, PartialEq)]
1500pub struct RawInbound {
1501 pub kind: String,
1502 pub headers: BTreeMap<String, String>,
1503 pub query: BTreeMap<String, String>,
1504 pub body: Vec<u8>,
1505 pub received_at: OffsetDateTime,
1506 pub occurred_at: Option<OffsetDateTime>,
1507 pub tenant_id: Option<TenantId>,
1508 pub metadata: JsonValue,
1509}
1510
1511impl RawInbound {
1512 pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
1513 Self {
1514 kind: kind.into(),
1515 headers,
1516 query: BTreeMap::new(),
1517 body,
1518 received_at: clock::now_utc(),
1519 occurred_at: None,
1520 tenant_id: None,
1521 metadata: JsonValue::Null,
1522 }
1523 }
1524
1525 pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
1526 Ok(serde_json::from_slice(&self.body)?)
1527 }
1528}
1529
1530#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1532pub struct RateLimitConfig {
1533 pub capacity: u32,
1534 pub refill_tokens: u32,
1535 pub refill_interval: StdDuration,
1536}
1537
1538impl Default for RateLimitConfig {
1539 fn default() -> Self {
1540 Self {
1541 capacity: 60,
1542 refill_tokens: 1,
1543 refill_interval: StdDuration::from_secs(1),
1544 }
1545 }
1546}
1547
1548#[derive(Clone, Debug)]
1549struct TokenBucket {
1550 tokens: f64,
1551 last_refill: ClockInstant,
1552}
1553
1554impl TokenBucket {
1555 fn full(config: RateLimitConfig) -> Self {
1556 Self {
1557 tokens: config.capacity as f64,
1558 last_refill: clock::instant_now(),
1559 }
1560 }
1561
1562 fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
1563 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1564 let rate = config.refill_tokens.max(1) as f64 / interval;
1565 let elapsed = now.duration_since(self.last_refill).as_secs_f64();
1566 self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
1567 self.last_refill = now;
1568 }
1569
1570 fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
1571 self.refill(config, now);
1572 if self.tokens >= 1.0 {
1573 self.tokens -= 1.0;
1574 true
1575 } else {
1576 false
1577 }
1578 }
1579
1580 fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
1581 if self.tokens >= 1.0 {
1582 return StdDuration::ZERO;
1583 }
1584 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1585 let rate = config.refill_tokens.max(1) as f64 / interval;
1586 let missing = (1.0 - self.tokens).max(0.0);
1587 StdDuration::from_secs_f64((missing / rate).max(0.001))
1588 }
1589}
1590
1591#[derive(Debug)]
1593pub struct RateLimiterFactory {
1594 config: RateLimitConfig,
1595 buckets: Mutex<HashMap<(String, String), TokenBucket>>,
1596}
1597
1598impl RateLimiterFactory {
1599 pub fn new(config: RateLimitConfig) -> Self {
1600 Self {
1601 config,
1602 buckets: Mutex::new(HashMap::new()),
1603 }
1604 }
1605
1606 pub fn config(&self) -> RateLimitConfig {
1607 self.config
1608 }
1609
1610 pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
1611 ScopedRateLimiter {
1612 factory: self,
1613 provider: provider.clone(),
1614 key: key.into(),
1615 }
1616 }
1617
1618 pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
1619 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1620 let bucket = buckets
1621 .entry((provider.as_str().to_string(), key.to_string()))
1622 .or_insert_with(|| TokenBucket::full(self.config));
1623 bucket.try_acquire(self.config, clock::instant_now())
1624 }
1625
1626 pub async fn acquire(&self, provider: &ProviderId, key: &str) {
1627 loop {
1628 let wait = {
1629 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1630 let bucket = buckets
1631 .entry((provider.as_str().to_string(), key.to_string()))
1632 .or_insert_with(|| TokenBucket::full(self.config));
1633 if bucket.try_acquire(self.config, clock::instant_now()) {
1634 return;
1635 }
1636 bucket.wait_duration(self.config)
1637 };
1638 tokio::time::sleep(wait).await;
1639 }
1640 }
1641}
1642
1643impl Default for RateLimiterFactory {
1644 fn default() -> Self {
1645 Self::new(RateLimitConfig::default())
1646 }
1647}
1648
1649#[derive(Clone, Debug)]
1651pub struct ScopedRateLimiter<'a> {
1652 factory: &'a RateLimiterFactory,
1653 provider: ProviderId,
1654 key: String,
1655}
1656
1657impl<'a> ScopedRateLimiter<'a> {
1658 pub fn try_acquire(&self) -> bool {
1659 self.factory.try_acquire(&self.provider, &self.key)
1660 }
1661
1662 pub async fn acquire(&self) {
1663 self.factory.acquire(&self.provider, &self.key).await;
1664 }
1665}
1666
1667pub struct ConnectorRegistry {
1669 connectors: BTreeMap<ProviderId, ConnectorHandle>,
1670}
1671
1672impl ConnectorRegistry {
1673 pub fn empty() -> Self {
1674 Self {
1675 connectors: BTreeMap::new(),
1676 }
1677 }
1678
1679 pub fn with_defaults() -> Self {
1680 let mut registry = Self::empty();
1681 for provider in registered_provider_metadata() {
1682 if !matches!(provider.runtime, ProviderRuntimeMetadata::Builtin { .. }) {
1683 continue;
1684 }
1685 registry
1686 .register(default_connector_for_provider(&provider))
1687 .expect("default connector registration should not fail");
1688 }
1689 registry
1690 }
1691
1692 pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
1693 let provider = connector.provider_id().clone();
1694 if self.connectors.contains_key(&provider) {
1695 return Err(ConnectorError::DuplicateProvider(provider.0));
1696 }
1697 self.connectors
1698 .insert(provider, Arc::new(AsyncMutex::new(connector)));
1699 Ok(())
1700 }
1701
1702 pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
1703 self.connectors.get(id).cloned()
1704 }
1705
1706 pub fn remove(&mut self, id: &ProviderId) -> Option<ConnectorHandle> {
1707 self.connectors.remove(id)
1708 }
1709
1710 pub fn list(&self) -> Vec<ProviderId> {
1711 self.connectors.keys().cloned().collect()
1712 }
1713
1714 pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1715 for connector in self.connectors.values() {
1716 connector.lock().await.init(ctx.clone()).await?;
1717 }
1718 Ok(())
1719 }
1720
1721 pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
1722 let mut clients = BTreeMap::new();
1723 for (provider, connector) in &self.connectors {
1724 let client = connector.lock().await.client();
1725 clients.insert(provider.clone(), client);
1726 }
1727 clients
1728 }
1729
1730 pub async fn activate_all(
1731 &self,
1732 registry: &TriggerRegistry,
1733 ) -> Result<Vec<ActivationHandle>, ConnectorError> {
1734 let mut handles = Vec::new();
1735 for (provider, connector) in &self.connectors {
1736 let bindings = registry.bindings_for(provider);
1737 if bindings.is_empty() {
1738 continue;
1739 }
1740 let connector = connector.lock().await;
1741 handles.push(connector.activate(bindings).await?);
1742 }
1743 Ok(handles)
1744 }
1745}
1746
1747impl Default for ConnectorRegistry {
1748 fn default() -> Self {
1749 Self::with_defaults()
1750 }
1751}
1752
1753fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
1754 match &provider.runtime {
1755 ProviderRuntimeMetadata::Builtin {
1756 connector,
1757 default_signature_variant,
1758 } => match connector.as_str() {
1759 "a2a-push" => Box::new(A2aPushConnector::new()),
1760 "cron" => Box::new(CronConnector::new()),
1761 "stream" => Box::new(StreamConnector::new(
1762 ProviderId::from(provider.provider.clone()),
1763 provider.schema_name.clone(),
1764 )),
1765 "webhook" => {
1766 let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
1767 .expect("catalog webhook signature variant must be valid");
1768 Box::new(GenericWebhookConnector::with_profile(
1769 WebhookProviderProfile::new(
1770 ProviderId::from(provider.provider.clone()),
1771 provider.schema_name.clone(),
1772 variant,
1773 ),
1774 ))
1775 }
1776 _ => Box::new(PlaceholderConnector::from_metadata(provider)),
1777 },
1778 ProviderRuntimeMetadata::Placeholder => {
1779 Box::new(PlaceholderConnector::from_metadata(provider))
1780 }
1781 }
1782}
1783
1784struct PlaceholderConnector {
1785 provider_id: ProviderId,
1786 kinds: Vec<TriggerKind>,
1787 schema_name: String,
1788}
1789
1790impl PlaceholderConnector {
1791 fn from_metadata(metadata: &ProviderMetadata) -> Self {
1792 Self {
1793 provider_id: ProviderId::from(metadata.provider.clone()),
1794 kinds: metadata
1795 .kinds
1796 .iter()
1797 .cloned()
1798 .map(TriggerKind::from)
1799 .collect(),
1800 schema_name: metadata.schema_name.clone(),
1801 }
1802 }
1803}
1804
1805struct PlaceholderClient;
1806
1807#[async_trait]
1808impl ConnectorClient for PlaceholderClient {
1809 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1810 Err(ClientError::Other(format!(
1811 "connector client method '{method}' is not implemented for this provider"
1812 )))
1813 }
1814}
1815
1816#[async_trait]
1817impl Connector for PlaceholderConnector {
1818 fn provider_id(&self) -> &ProviderId {
1819 &self.provider_id
1820 }
1821
1822 fn kinds(&self) -> &[TriggerKind] {
1823 &self.kinds
1824 }
1825
1826 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1827 Ok(())
1828 }
1829
1830 async fn activate(
1831 &self,
1832 bindings: &[TriggerBinding],
1833 ) -> Result<ActivationHandle, ConnectorError> {
1834 Ok(ActivationHandle::new(
1835 self.provider_id.clone(),
1836 bindings.len(),
1837 ))
1838 }
1839
1840 async fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
1841 Err(ConnectorError::Unsupported(format!(
1842 "provider '{}' is cataloged but does not have a concrete inbound connector yet",
1843 self.provider_id.as_str()
1844 )))
1845 }
1846
1847 fn payload_schema(&self) -> ProviderPayloadSchema {
1848 ProviderPayloadSchema::named(self.schema_name.clone())
1849 }
1850
1851 fn client(&self) -> Arc<dyn ConnectorClient> {
1852 Arc::new(PlaceholderClient)
1853 }
1854}
1855
1856pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
1857 ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
1858 *slot.borrow_mut() = clients
1859 .into_iter()
1860 .map(|(provider, client)| (provider.as_str().to_string(), client))
1861 .collect();
1862 });
1863}
1864
1865pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
1866 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
1867}
1868
1869pub fn clear_active_connector_clients() {
1870 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
1871}
1872
1873#[cfg(test)]
1874mod tests {
1875 use super::*;
1876
1877 use std::sync::atomic::{AtomicUsize, Ordering};
1878
1879 use async_trait::async_trait;
1880 use serde_json::json;
1881
1882 struct NoopClient;
1883
1884 #[async_trait]
1885 impl ConnectorClient for NoopClient {
1886 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1887 Ok(json!({ "method": method }))
1888 }
1889 }
1890
1891 struct FakeConnector {
1892 provider_id: ProviderId,
1893 kinds: Vec<TriggerKind>,
1894 activate_calls: Arc<AtomicUsize>,
1895 }
1896
1897 impl FakeConnector {
1898 fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
1899 Self {
1900 provider_id: ProviderId::from(provider_id),
1901 kinds: vec![TriggerKind::from("webhook")],
1902 activate_calls,
1903 }
1904 }
1905 }
1906
1907 #[async_trait]
1908 impl Connector for FakeConnector {
1909 fn provider_id(&self) -> &ProviderId {
1910 &self.provider_id
1911 }
1912
1913 fn kinds(&self) -> &[TriggerKind] {
1914 &self.kinds
1915 }
1916
1917 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1918 Ok(())
1919 }
1920
1921 async fn activate(
1922 &self,
1923 bindings: &[TriggerBinding],
1924 ) -> Result<ActivationHandle, ConnectorError> {
1925 self.activate_calls.fetch_add(1, Ordering::SeqCst);
1926 Ok(ActivationHandle::new(
1927 self.provider_id.clone(),
1928 bindings.len(),
1929 ))
1930 }
1931
1932 async fn normalize_inbound(
1933 &self,
1934 _raw: RawInbound,
1935 ) -> Result<TriggerEvent, ConnectorError> {
1936 Err(ConnectorError::Unsupported(
1937 "not needed for registry tests".to_string(),
1938 ))
1939 }
1940
1941 fn payload_schema(&self) -> ProviderPayloadSchema {
1942 ProviderPayloadSchema::named("FakePayload")
1943 }
1944
1945 fn client(&self) -> Arc<dyn ConnectorClient> {
1946 Arc::new(NoopClient)
1947 }
1948 }
1949
1950 #[tokio::test]
1951 async fn connector_registry_rejects_duplicate_providers() {
1952 let activate_calls = Arc::new(AtomicUsize::new(0));
1953 let mut registry = ConnectorRegistry::empty();
1954 registry
1955 .register(Box::new(FakeConnector::new(
1956 "github",
1957 activate_calls.clone(),
1958 )))
1959 .unwrap();
1960
1961 let error = registry
1962 .register(Box::new(FakeConnector::new("github", activate_calls)))
1963 .unwrap_err();
1964 assert!(matches!(
1965 error,
1966 ConnectorError::DuplicateProvider(provider) if provider == "github"
1967 ));
1968 }
1969
1970 #[tokio::test]
1971 async fn connector_registry_activates_only_bound_connectors() {
1972 let github_calls = Arc::new(AtomicUsize::new(0));
1973 let slack_calls = Arc::new(AtomicUsize::new(0));
1974 let mut registry = ConnectorRegistry::empty();
1975 registry
1976 .register(Box::new(FakeConnector::new("github", github_calls.clone())))
1977 .unwrap();
1978 registry
1979 .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
1980 .unwrap();
1981
1982 let mut trigger_registry = TriggerRegistry::default();
1983 trigger_registry.register(TriggerBinding::new(
1984 ProviderId::from("github"),
1985 "webhook",
1986 "github.push",
1987 ));
1988 trigger_registry.register(TriggerBinding::new(
1989 ProviderId::from("github"),
1990 "webhook",
1991 "github.installation",
1992 ));
1993
1994 let handles = registry.activate_all(&trigger_registry).await.unwrap();
1995 assert_eq!(handles.len(), 1);
1996 assert_eq!(handles[0].provider.as_str(), "github");
1997 assert_eq!(handles[0].binding_count, 2);
1998 assert_eq!(github_calls.load(Ordering::SeqCst), 1);
1999 assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
2000 }
2001
2002 #[test]
2003 fn rate_limiter_scopes_tokens_by_provider_and_key() {
2004 let factory = RateLimiterFactory::new(RateLimitConfig {
2005 capacity: 1,
2006 refill_tokens: 1,
2007 refill_interval: StdDuration::from_secs(60),
2008 });
2009
2010 assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
2011 assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
2012 assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
2013 assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
2014 }
2015
2016 #[test]
2017 fn raw_inbound_json_body_preserves_raw_bytes() {
2018 let raw = RawInbound::new(
2019 "push",
2020 BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
2021 br#"{"ok":true}"#.to_vec(),
2022 );
2023
2024 assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
2025 }
2026
2027 #[test]
2028 fn connector_registry_lists_core_catalog_providers() {
2029 let registry = ConnectorRegistry::default();
2030 let providers = registry.list();
2031 assert!(providers.contains(&ProviderId::from("cron")));
2032 assert!(providers.contains(&ProviderId::from("webhook")));
2033 assert!(providers.contains(&ProviderId::from("kafka")));
2034 assert!(!providers.contains(&ProviderId::from("github")));
2035 assert!(!providers.contains(&ProviderId::from("slack")));
2036 }
2037
2038 #[test]
2039 fn pure_harn_pivot_only_keeps_core_builtin_connectors() {
2040 let core_runtime_providers = [
2041 "a2a-push",
2042 "cron",
2043 "email",
2044 "kafka",
2045 "nats",
2046 "postgres-cdc",
2047 "pulsar",
2048 "webhook",
2049 "websocket",
2050 ];
2051
2052 for provider in registered_provider_metadata() {
2053 if !matches!(provider.runtime, ProviderRuntimeMetadata::Builtin { .. }) {
2054 continue;
2055 }
2056
2057 let allowed_core = core_runtime_providers.contains(&provider.provider.as_str());
2058 assert!(
2059 allowed_core,
2060 "provider '{}' is registered as a Rust builtin connector; new service connectors \
2061 must ship as pure-Harn packages and register with connector = {{ harn = \"...\" }}",
2062 provider.provider
2063 );
2064 }
2065 }
2066
2067 #[test]
2068 fn metrics_registry_exports_orchestrator_metric_families() {
2069 let metrics = MetricsRegistry::default();
2070 metrics.record_http_request(
2071 "/triggers/github",
2072 "POST",
2073 200,
2074 StdDuration::from_millis(25),
2075 512,
2076 );
2077 metrics.record_trigger_received("github-new-issue", "github");
2078 metrics.record_trigger_deduped("github-new-issue", "inbox_duplicate");
2079 metrics.record_trigger_predicate_evaluation("github-new-issue", true, 0.002);
2080 metrics.record_trigger_dispatched("github-new-issue", "local", "succeeded");
2081 metrics.record_trigger_retry("github-new-issue", 2);
2082 metrics.record_trigger_dlq("github-new-issue", "retry_exhausted");
2083 metrics.set_trigger_inflight("github-new-issue", 0);
2084 metrics.record_event_log_append(
2085 "orchestrator.triggers.pending",
2086 StdDuration::from_millis(1),
2087 2048,
2088 );
2089 metrics.set_event_log_consumer_lag("orchestrator.triggers.pending", "orchestrator-pump", 0);
2090 metrics.set_trigger_budget_cost_today("github-new-issue", 0.002);
2091 metrics.record_trigger_budget_exhausted("github-new-issue", "daily_budget_exceeded");
2092 metrics.record_a2a_hop("agent.example", "succeeded", StdDuration::from_millis(10));
2093 metrics.set_worker_queue_depth("triage", 1);
2094 metrics.record_worker_queue_claim_age("triage", 3.0);
2095 metrics.set_orchestrator_pump_backlog("trigger.inbox.envelopes", 2);
2096 metrics.set_orchestrator_pump_outstanding("trigger.inbox.envelopes", 1);
2097 metrics.record_orchestrator_pump_admission_delay(
2098 "trigger.inbox.envelopes",
2099 StdDuration::from_millis(50),
2100 );
2101 metrics.record_trigger_accepted_to_normalized(
2102 "github-new-issue",
2103 "github-new-issue@v7",
2104 "github",
2105 Some("tenant-a"),
2106 "normalized",
2107 StdDuration::from_millis(25),
2108 );
2109 metrics.record_trigger_accepted_to_queue_append(
2110 "github-new-issue",
2111 "github-new-issue@v7",
2112 "github",
2113 Some("tenant-a"),
2114 "queued",
2115 StdDuration::from_millis(40),
2116 );
2117 metrics.record_trigger_queue_age_at_dispatch_admission(
2118 "github-new-issue",
2119 "github-new-issue@v7",
2120 "github",
2121 Some("tenant-a"),
2122 "admitted",
2123 StdDuration::from_millis(75),
2124 );
2125 metrics.record_trigger_queue_age_at_dispatch_start(
2126 "github-new-issue",
2127 "github-new-issue@v7",
2128 "github",
2129 Some("tenant-a"),
2130 "started",
2131 StdDuration::from_millis(125),
2132 );
2133 metrics.record_trigger_dispatch_runtime(
2134 "github-new-issue",
2135 "github-new-issue@v7",
2136 "github",
2137 Some("tenant-a"),
2138 "succeeded",
2139 StdDuration::from_millis(250),
2140 );
2141 metrics.record_trigger_retry_delay(
2142 "github-new-issue",
2143 "github-new-issue@v7",
2144 "github",
2145 Some("tenant-a"),
2146 "scheduled",
2147 StdDuration::from_secs(2),
2148 );
2149 metrics.record_trigger_accepted_to_dlq(
2150 "github-new-issue",
2151 "github-new-issue@v7",
2152 "github",
2153 Some("tenant-a"),
2154 "retry_exhausted",
2155 StdDuration::from_secs(45),
2156 );
2157 metrics.record_backpressure_event("ingest", "reject");
2158 metrics.note_trigger_pending_event(
2159 "evt-1",
2160 "github-new-issue",
2161 "github-new-issue@v7",
2162 "github",
2163 Some("tenant-a"),
2164 1_000,
2165 4_000,
2166 );
2167 metrics.record_llm_call("mock", "mock", "succeeded", 0.01);
2168 metrics.record_llm_cache_hit("mock");
2169
2170 let rendered = metrics.render_prometheus();
2171 for needle in [
2172 "harn_http_requests_total{endpoint=\"/triggers/github\",method=\"POST\",status=\"200\"} 1",
2173 "harn_http_request_duration_seconds_bucket{endpoint=\"/triggers/github\",le=\"0.05\"} 1",
2174 "harn_http_body_size_bytes_bucket{endpoint=\"/triggers/github\",le=\"512\"} 1",
2175 "harn_trigger_received_total{provider=\"github\",trigger_id=\"github-new-issue\"} 1",
2176 "harn_trigger_deduped_total{reason=\"inbox_duplicate\",trigger_id=\"github-new-issue\"} 1",
2177 "harn_trigger_predicate_evaluations_total{result=\"true\",trigger_id=\"github-new-issue\"} 1",
2178 "harn_trigger_predicate_cost_usd_bucket{le=\"0.01\",trigger_id=\"github-new-issue\"} 1",
2179 "harn_trigger_dispatched_total{handler_kind=\"local\",outcome=\"succeeded\",trigger_id=\"github-new-issue\"} 1",
2180 "harn_trigger_retries_total{attempt=\"2\",trigger_id=\"github-new-issue\"} 1",
2181 "harn_trigger_dlq_total{reason=\"retry_exhausted\",trigger_id=\"github-new-issue\"} 1",
2182 "harn_trigger_inflight{trigger_id=\"github-new-issue\"} 0",
2183 "harn_event_log_append_duration_seconds_bucket{le=\"0.005\",topic=\"orchestrator.triggers.pending\"} 1",
2184 "harn_event_log_topic_size_bytes{topic=\"orchestrator.triggers.pending\"} 2048",
2185 "harn_event_log_consumer_lag{consumer=\"orchestrator-pump\",topic=\"orchestrator.triggers.pending\"} 0",
2186 "harn_trigger_budget_cost_today_usd{trigger_id=\"github-new-issue\"} 0.002",
2187 "harn_trigger_budget_exhausted_total{strategy=\"daily_budget_exceeded\",trigger_id=\"github-new-issue\"} 1",
2188 "harn_backpressure_events_total{action=\"reject\",dimension=\"ingest\"} 1",
2189 "harn_a2a_hops_total{outcome=\"succeeded\",target=\"agent.example\"} 1",
2190 "harn_a2a_hop_duration_seconds_bucket{le=\"0.01\",target=\"agent.example\"} 1",
2191 "harn_worker_queue_depth{queue=\"triage\"} 1",
2192 "harn_worker_queue_claim_age_seconds_bucket{le=\"5\",queue=\"triage\"} 1",
2193 "harn_orchestrator_pump_backlog{topic=\"trigger.inbox.envelopes\"} 2",
2194 "harn_orchestrator_pump_outstanding{topic=\"trigger.inbox.envelopes\"} 1",
2195 "harn_orchestrator_pump_admission_delay_seconds_bucket{le=\"0.05\",topic=\"trigger.inbox.envelopes\"} 1",
2196 "harn_trigger_webhook_accepted_to_normalized_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.025\",provider=\"github\",status=\"normalized\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2197 "harn_trigger_webhook_accepted_to_queue_append_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.05\",provider=\"github\",status=\"queued\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2198 "harn_trigger_queue_age_at_dispatch_admission_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.1\",provider=\"github\",status=\"admitted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2199 "harn_trigger_queue_age_at_dispatch_start_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"started\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2200 "harn_trigger_dispatch_runtime_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"succeeded\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2201 "harn_trigger_retry_delay_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"2.5\",provider=\"github\",status=\"scheduled\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2202 "harn_trigger_accepted_to_dlq_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"60\",provider=\"github\",status=\"retry_exhausted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2203 "harn_trigger_oldest_pending_age_seconds{binding_key=\"github-new-issue@v7\",provider=\"github\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 3",
2204 "harn_llm_calls_total{model=\"mock\",outcome=\"succeeded\",provider=\"mock\"} 1",
2205 "harn_llm_cost_usd_total{model=\"mock\",provider=\"mock\"} 0.01",
2206 "harn_llm_cache_hits_total{provider=\"mock\"} 1",
2207 ] {
2208 assert!(rendered.contains(needle), "missing {needle}\n{rendered}");
2209 }
2210 }
2211}