1use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex, OnceLock};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25 registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26 ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod a2a_push;
30pub mod cron;
31pub mod effect_policy;
32pub mod harn_module;
33pub mod hmac;
34pub mod shared;
35pub mod stream;
36#[cfg(test)]
37pub(crate) mod test_util;
38pub mod testkit;
39pub mod webhook;
40
41pub use a2a_push::A2aPushConnector;
42pub use cron::{CatchupMode, CronConnector};
43pub use effect_policy::{
44 connector_export_denied_builtin_reason, connector_export_effect_class,
45 default_connector_export_policy, ConnectorExportEffectClass, HarnConnectorEffectPolicies,
46};
47pub use harn_module::{
48 load_contract as load_harn_connector_contract, HarnConnector, HarnConnectorContract,
49};
50pub use hmac::{
51 verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
52 DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
53 DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
54 DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
55 DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
56 DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
57 SIGNATURE_VERIFY_AUDIT_TOPIC,
58};
59pub use shared::{
60 paginate_cursor, resolve_jwks, verify_hmac_signature, verify_jwt_claims, verify_jwt_json,
61 ConnectorBase, CursorPage, HmacSignatureAlgorithm, JwtKeySource, JwtVerificationOptions,
62};
63pub use stream::StreamConnector;
64use webhook::WebhookProviderProfile;
65pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
66
67const OUTBOUND_CONNECTOR_HTTP_TIMEOUT: StdDuration = StdDuration::from_secs(30);
68
69pub(crate) fn outbound_http_client(user_agent: &'static str) -> reqwest::Client {
70 reqwest::Client::builder()
71 .user_agent(user_agent)
72 .timeout(OUTBOUND_CONNECTOR_HTTP_TIMEOUT)
73 .redirect(reqwest::redirect::Policy::custom(|attempt| {
74 if attempt.previous().len() >= 10 {
75 attempt.error("too many redirects")
76 } else if crate::egress::redirect_url_allowed(
77 "connector_redirect",
78 attempt.url().as_str(),
79 ) {
80 attempt.follow()
81 } else {
82 attempt.error("egress policy blocked redirect target")
83 }
84 }))
85 .build()
86 .expect("connector HTTP client configuration should be valid")
87}
88
89pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
91
92thread_local! {
93 static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
94 RefCell::new(BTreeMap::new());
95}
96
97#[async_trait]
99pub trait Connector: Send + Sync {
100 fn provider_id(&self) -> &ProviderId;
102
103 fn kinds(&self) -> &[TriggerKind];
105
106 async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
108
109 async fn activate(
111 &self,
112 bindings: &[TriggerBinding],
113 ) -> Result<ActivationHandle, ConnectorError>;
114
115 async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
117 Ok(())
118 }
119
120 async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
122
123 async fn normalize_inbound_result(
126 &self,
127 raw: RawInbound,
128 ) -> Result<ConnectorNormalizeResult, ConnectorError> {
129 self.normalize_inbound(raw)
130 .await
131 .map(ConnectorNormalizeResult::event)
132 }
133
134 fn payload_schema(&self) -> ProviderPayloadSchema;
136
137 fn client(&self) -> Arc<dyn ConnectorClient>;
139}
140
141#[derive(Clone, Debug, PartialEq)]
143pub struct ConnectorHttpResponse {
144 pub status: u16,
145 pub headers: BTreeMap<String, String>,
146 pub body: JsonValue,
147}
148
149impl ConnectorHttpResponse {
150 pub fn new(status: u16, headers: BTreeMap<String, String>, body: JsonValue) -> Self {
151 Self {
152 status,
153 headers,
154 body,
155 }
156 }
157}
158
159#[derive(Clone, Debug, PartialEq)]
161pub enum ConnectorNormalizeResult {
162 Event(Box<TriggerEvent>),
163 Batch(Vec<TriggerEvent>),
164 ImmediateResponse {
165 response: ConnectorHttpResponse,
166 events: Vec<TriggerEvent>,
167 },
168 Reject(ConnectorHttpResponse),
169}
170
171impl ConnectorNormalizeResult {
172 pub fn event(event: TriggerEvent) -> Self {
173 Self::Event(Box::new(event))
174 }
175
176 pub fn into_events(self) -> Vec<TriggerEvent> {
177 match self {
178 Self::Event(event) => vec![*event],
179 Self::Batch(events) | Self::ImmediateResponse { events, .. } => events,
180 Self::Reject(_) => Vec::new(),
181 }
182 }
183}
184
185#[derive(Clone, Debug, PartialEq)]
186pub enum PostNormalizeOutcome {
187 Ready(Box<TriggerEvent>),
188 DuplicateDropped,
189}
190
191pub async fn postprocess_normalized_event(
192 inbox: &InboxIndex,
193 binding_id: &str,
194 dedupe_enabled: bool,
195 dedupe_ttl: StdDuration,
196 mut event: TriggerEvent,
197) -> Result<PostNormalizeOutcome, ConnectorError> {
198 if dedupe_enabled && !event.dedupe_claimed() {
199 if !inbox
200 .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
201 .await?
202 {
203 return Ok(PostNormalizeOutcome::DuplicateDropped);
204 }
205 event.mark_dedupe_claimed();
206 }
207
208 Ok(PostNormalizeOutcome::Ready(Box::new(event)))
209}
210
211#[async_trait]
213pub trait ConnectorClient: Send + Sync {
214 async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
215}
216
217#[derive(Clone, Debug, PartialEq, Eq)]
219pub enum ClientError {
220 MethodNotFound(String),
221 InvalidArgs(String),
222 RateLimited(String),
223 Transport(String),
224 EgressBlocked(crate::egress::EgressBlocked),
225 Other(String),
226}
227
228impl fmt::Display for ClientError {
229 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230 match self {
231 Self::MethodNotFound(message)
232 | Self::InvalidArgs(message)
233 | Self::RateLimited(message)
234 | Self::Transport(message)
235 | Self::Other(message) => message.fmt(f),
236 Self::EgressBlocked(blocked) => blocked.fmt(f),
237 }
238 }
239}
240
241impl std::error::Error for ClientError {}
242
243#[derive(Debug)]
245pub enum ConnectorError {
246 DuplicateProvider(String),
247 DuplicateDelivery(String),
248 UnknownProvider(String),
249 MissingHeader(String),
250 InvalidHeader {
251 name: String,
252 detail: String,
253 },
254 InvalidSignature(String),
255 TimestampOutOfWindow {
256 timestamp: OffsetDateTime,
257 now: OffsetDateTime,
258 window: time::Duration,
259 },
260 Json(String),
261 Secret(String),
262 EventLog(String),
263 HarnRuntime(String),
264 Client(ClientError),
265 Unsupported(String),
266 Activation(String),
267}
268
269impl ConnectorError {
270 pub fn invalid_signature(message: impl Into<String>) -> Self {
271 Self::InvalidSignature(message.into())
272 }
273}
274
275impl fmt::Display for ConnectorError {
276 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
277 match self {
278 Self::DuplicateProvider(provider) => {
279 write!(f, "connector provider `{provider}` is already registered")
280 }
281 Self::DuplicateDelivery(message) => message.fmt(f),
282 Self::UnknownProvider(provider) => {
283 write!(f, "connector provider `{provider}` is not registered")
284 }
285 Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
286 Self::InvalidHeader { name, detail } => {
287 write!(f, "invalid header `{name}`: {detail}")
288 }
289 Self::InvalidSignature(message)
290 | Self::Json(message)
291 | Self::Secret(message)
292 | Self::EventLog(message)
293 | Self::HarnRuntime(message)
294 | Self::Unsupported(message)
295 | Self::Activation(message) => message.fmt(f),
296 Self::TimestampOutOfWindow {
297 timestamp,
298 now,
299 window,
300 } => write!(
301 f,
302 "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
303 ),
304 Self::Client(error) => error.fmt(f),
305 }
306 }
307}
308
309impl std::error::Error for ConnectorError {}
310
311impl From<crate::event_log::LogError> for ConnectorError {
312 fn from(value: crate::event_log::LogError) -> Self {
313 Self::EventLog(value.to_string())
314 }
315}
316
317impl From<crate::secrets::SecretError> for ConnectorError {
318 fn from(value: crate::secrets::SecretError) -> Self {
319 Self::Secret(value.to_string())
320 }
321}
322
323impl From<serde_json::Error> for ConnectorError {
324 fn from(value: serde_json::Error) -> Self {
325 Self::Json(value.to_string())
326 }
327}
328
329impl From<ClientError> for ConnectorError {
330 fn from(value: ClientError) -> Self {
331 Self::Client(value)
332 }
333}
334
335#[derive(Clone)]
337pub struct ConnectorCtx {
338 pub event_log: Arc<AnyEventLog>,
339 pub secrets: Arc<dyn SecretProvider>,
340 pub inbox: Arc<InboxIndex>,
341 pub metrics: Arc<MetricsRegistry>,
342 pub rate_limiter: Arc<RateLimiterFactory>,
343}
344
345#[derive(Clone, Debug, Default, PartialEq, Eq)]
347pub struct ConnectorMetricsSnapshot {
348 pub inbox_claims_written: u64,
349 pub inbox_duplicates_rejected: u64,
350 pub inbox_fast_path_hits: u64,
351 pub inbox_durable_hits: u64,
352 pub inbox_expired_entries: u64,
353 pub inbox_active_entries: u64,
354 pub linear_timestamp_rejections_total: u64,
355 pub dispatch_succeeded_total: u64,
356 pub dispatch_failed_total: u64,
357 pub retry_scheduled_total: u64,
358 pub slack_delivery_success_total: u64,
359 pub slack_delivery_failure_total: u64,
360}
361
362type MetricLabels = BTreeMap<String, String>;
363
364#[derive(Clone, Debug, Default, PartialEq)]
365struct HistogramMetric {
366 buckets: BTreeMap<String, u64>,
367 count: u64,
368 sum: f64,
369}
370
371static ACTIVE_METRICS_REGISTRY: OnceLock<Mutex<Option<Arc<MetricsRegistry>>>> = OnceLock::new();
372
373pub fn install_active_metrics_registry(metrics: Arc<MetricsRegistry>) {
374 let slot = ACTIVE_METRICS_REGISTRY.get_or_init(|| Mutex::new(None));
375 *slot.lock().expect("active metrics registry poisoned") = Some(metrics);
376}
377
378pub fn clear_active_metrics_registry() {
379 if let Some(slot) = ACTIVE_METRICS_REGISTRY.get() {
380 *slot.lock().expect("active metrics registry poisoned") = None;
381 }
382}
383
384pub fn active_metrics_registry() -> Option<Arc<MetricsRegistry>> {
385 ACTIVE_METRICS_REGISTRY.get().and_then(|slot| {
386 slot.lock()
387 .expect("active metrics registry poisoned")
388 .clone()
389 })
390}
391
392#[derive(Debug, Default)]
394pub struct MetricsRegistry {
395 inbox_claims_written: AtomicU64,
396 inbox_duplicates_rejected: AtomicU64,
397 inbox_fast_path_hits: AtomicU64,
398 inbox_durable_hits: AtomicU64,
399 inbox_expired_entries: AtomicU64,
400 inbox_active_entries: AtomicU64,
401 linear_timestamp_rejections_total: AtomicU64,
402 dispatch_succeeded_total: AtomicU64,
403 dispatch_failed_total: AtomicU64,
404 retry_scheduled_total: AtomicU64,
405 slack_delivery_success_total: AtomicU64,
406 slack_delivery_failure_total: AtomicU64,
407 custom_counters: Mutex<BTreeMap<String, u64>>,
408 counters: Mutex<BTreeMap<(String, MetricLabels), f64>>,
409 gauges: Mutex<BTreeMap<(String, MetricLabels), f64>>,
410 histograms: Mutex<BTreeMap<(String, MetricLabels), HistogramMetric>>,
411 pending_trigger_events: Mutex<BTreeMap<MetricLabels, BTreeMap<String, i64>>>,
412}
413
414impl MetricsRegistry {
415 const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0];
416 const TRIGGER_LATENCY_BUCKETS: [f64; 15] = [
417 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0,
418 ];
419 const SIZE_BUCKETS: [f64; 9] = [
420 128.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0, 10485760.0,
421 ];
422
423 pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
424 ConnectorMetricsSnapshot {
425 inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
426 inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
427 inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
428 inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
429 inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
430 inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
431 linear_timestamp_rejections_total: self
432 .linear_timestamp_rejections_total
433 .load(Ordering::Relaxed),
434 dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
435 dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
436 retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
437 slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
438 slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
439 }
440 }
441
442 pub(crate) fn record_inbox_claim(&self) {
443 self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
444 }
445
446 pub(crate) fn record_inbox_duplicate_fast_path(&self) {
447 self.inbox_duplicates_rejected
448 .fetch_add(1, Ordering::Relaxed);
449 self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
450 }
451
452 pub(crate) fn record_inbox_duplicate_durable(&self) {
453 self.inbox_duplicates_rejected
454 .fetch_add(1, Ordering::Relaxed);
455 self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
456 }
457
458 pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
459 if count > 0 {
460 self.inbox_expired_entries
461 .fetch_add(count, Ordering::Relaxed);
462 }
463 }
464
465 pub(crate) fn set_inbox_active_entries(&self, count: usize) {
466 self.inbox_active_entries
467 .store(count as u64, Ordering::Relaxed);
468 }
469
470 pub fn record_linear_timestamp_rejection(&self) {
471 self.linear_timestamp_rejections_total
472 .fetch_add(1, Ordering::Relaxed);
473 }
474
475 pub fn record_dispatch_succeeded(&self) {
476 self.dispatch_succeeded_total
477 .fetch_add(1, Ordering::Relaxed);
478 }
479
480 pub fn record_dispatch_failed(&self) {
481 self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
482 }
483
484 pub fn record_retry_scheduled(&self) {
485 self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
486 }
487
488 pub fn record_slack_delivery_success(&self) {
489 self.slack_delivery_success_total
490 .fetch_add(1, Ordering::Relaxed);
491 }
492
493 pub fn record_slack_delivery_failure(&self) {
494 self.slack_delivery_failure_total
495 .fetch_add(1, Ordering::Relaxed);
496 }
497
498 pub fn record_custom_counter(&self, name: &str, amount: u64) {
499 if amount == 0 {
500 return;
501 }
502 let mut counters = self
503 .custom_counters
504 .lock()
505 .expect("custom counters poisoned");
506 *counters.entry(name.to_string()).or_default() += amount;
507 }
508
509 pub fn record_http_request(
510 &self,
511 endpoint: &str,
512 method: &str,
513 status: u16,
514 duration: StdDuration,
515 body_size_bytes: usize,
516 ) {
517 self.increment_counter(
518 "harn_http_requests_total",
519 labels([
520 ("endpoint", endpoint),
521 ("method", method),
522 ("status", &status.to_string()),
523 ]),
524 1,
525 );
526 self.observe_histogram(
527 "harn_http_request_duration_seconds",
528 labels([("endpoint", endpoint)]),
529 duration.as_secs_f64(),
530 &Self::DURATION_BUCKETS,
531 );
532 self.observe_histogram(
533 "harn_http_body_size_bytes",
534 labels([("endpoint", endpoint)]),
535 body_size_bytes as f64,
536 &Self::SIZE_BUCKETS,
537 );
538 }
539
540 pub fn record_trigger_received(&self, trigger_id: &str, provider: &str) {
541 self.increment_counter(
542 "harn_trigger_received_total",
543 labels([("trigger_id", trigger_id), ("provider", provider)]),
544 1,
545 );
546 }
547
548 pub fn record_trigger_deduped(&self, trigger_id: &str, reason: &str) {
549 self.increment_counter(
550 "harn_trigger_deduped_total",
551 labels([("trigger_id", trigger_id), ("reason", reason)]),
552 1,
553 );
554 }
555
556 pub fn record_trigger_predicate_evaluation(
557 &self,
558 trigger_id: &str,
559 result: bool,
560 cost_usd: f64,
561 ) {
562 self.increment_counter(
563 "harn_trigger_predicate_evaluations_total",
564 labels([
565 ("trigger_id", trigger_id),
566 ("result", if result { "true" } else { "false" }),
567 ]),
568 1,
569 );
570 self.observe_histogram(
571 "harn_trigger_predicate_cost_usd",
572 labels([("trigger_id", trigger_id)]),
573 cost_usd.max(0.0),
574 &[0.0, 0.001, 0.01, 0.05, 0.1, 1.0],
575 );
576 }
577
578 pub fn record_trigger_dispatched(&self, trigger_id: &str, handler_kind: &str, outcome: &str) {
579 self.increment_counter(
580 "harn_trigger_dispatched_total",
581 labels([
582 ("trigger_id", trigger_id),
583 ("handler_kind", handler_kind),
584 ("outcome", outcome),
585 ]),
586 1,
587 );
588 }
589
590 pub fn record_trigger_retry(&self, trigger_id: &str, attempt: u32) {
591 self.increment_counter(
592 "harn_trigger_retries_total",
593 labels([
594 ("trigger_id", trigger_id),
595 ("attempt", &attempt.to_string()),
596 ]),
597 1,
598 );
599 }
600
601 pub fn record_trigger_dlq(&self, trigger_id: &str, reason: &str) {
602 self.increment_counter(
603 "harn_trigger_dlq_total",
604 labels([("trigger_id", trigger_id), ("reason", reason)]),
605 1,
606 );
607 }
608
609 pub fn record_trigger_accepted_to_normalized(
610 &self,
611 trigger_id: &str,
612 binding_key: &str,
613 provider: &str,
614 tenant_id: Option<&str>,
615 status: &str,
616 duration: StdDuration,
617 ) {
618 self.observe_histogram(
619 "harn_trigger_webhook_accepted_to_normalized_seconds",
620 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
621 duration.as_secs_f64(),
622 &Self::TRIGGER_LATENCY_BUCKETS,
623 );
624 }
625
626 pub fn record_trigger_accepted_to_queue_append(
627 &self,
628 trigger_id: &str,
629 binding_key: &str,
630 provider: &str,
631 tenant_id: Option<&str>,
632 status: &str,
633 duration: StdDuration,
634 ) {
635 self.observe_histogram(
636 "harn_trigger_webhook_accepted_to_queue_append_seconds",
637 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
638 duration.as_secs_f64(),
639 &Self::TRIGGER_LATENCY_BUCKETS,
640 );
641 }
642
643 pub fn record_trigger_queue_age_at_dispatch_admission(
644 &self,
645 trigger_id: &str,
646 binding_key: &str,
647 provider: &str,
648 tenant_id: Option<&str>,
649 status: &str,
650 age: StdDuration,
651 ) {
652 self.observe_histogram(
653 "harn_trigger_queue_age_at_dispatch_admission_seconds",
654 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
655 age.as_secs_f64(),
656 &Self::TRIGGER_LATENCY_BUCKETS,
657 );
658 }
659
660 pub fn record_trigger_queue_age_at_dispatch_start(
661 &self,
662 trigger_id: &str,
663 binding_key: &str,
664 provider: &str,
665 tenant_id: Option<&str>,
666 status: &str,
667 age: StdDuration,
668 ) {
669 self.observe_histogram(
670 "harn_trigger_queue_age_at_dispatch_start_seconds",
671 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
672 age.as_secs_f64(),
673 &Self::TRIGGER_LATENCY_BUCKETS,
674 );
675 }
676
677 pub fn record_trigger_dispatch_runtime(
678 &self,
679 trigger_id: &str,
680 binding_key: &str,
681 provider: &str,
682 tenant_id: Option<&str>,
683 status: &str,
684 duration: StdDuration,
685 ) {
686 self.observe_histogram(
687 "harn_trigger_dispatch_runtime_seconds",
688 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
689 duration.as_secs_f64(),
690 &Self::TRIGGER_LATENCY_BUCKETS,
691 );
692 }
693
694 pub fn record_trigger_retry_delay(
695 &self,
696 trigger_id: &str,
697 binding_key: &str,
698 provider: &str,
699 tenant_id: Option<&str>,
700 status: &str,
701 duration: StdDuration,
702 ) {
703 self.observe_histogram(
704 "harn_trigger_retry_delay_seconds",
705 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
706 duration.as_secs_f64(),
707 &Self::TRIGGER_LATENCY_BUCKETS,
708 );
709 }
710
711 pub fn record_trigger_accepted_to_dlq(
712 &self,
713 trigger_id: &str,
714 binding_key: &str,
715 provider: &str,
716 tenant_id: Option<&str>,
717 status: &str,
718 duration: StdDuration,
719 ) {
720 self.observe_histogram(
721 "harn_trigger_accepted_to_dlq_seconds",
722 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
723 duration.as_secs_f64(),
724 &Self::TRIGGER_LATENCY_BUCKETS,
725 );
726 }
727
728 pub fn note_trigger_pending_event(
729 &self,
730 event_id: &str,
731 trigger_id: &str,
732 binding_key: &str,
733 provider: &str,
734 tenant_id: Option<&str>,
735 accepted_at_ms: i64,
736 now_ms: i64,
737 ) {
738 let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
739 {
740 let mut pending = self
741 .pending_trigger_events
742 .lock()
743 .expect("pending trigger events poisoned");
744 pending
745 .entry(labels.clone())
746 .or_default()
747 .insert(event_id.to_string(), accepted_at_ms);
748 }
749 self.refresh_oldest_pending_gauge(labels, now_ms);
750 }
751
752 pub fn clear_trigger_pending_event(
753 &self,
754 event_id: &str,
755 trigger_id: &str,
756 binding_key: &str,
757 provider: &str,
758 tenant_id: Option<&str>,
759 now_ms: i64,
760 ) {
761 let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
762 {
763 let mut pending = self
764 .pending_trigger_events
765 .lock()
766 .expect("pending trigger events poisoned");
767 if let Some(events) = pending.get_mut(&labels) {
768 events.remove(event_id);
769 if events.is_empty() {
770 pending.remove(&labels);
771 }
772 }
773 }
774 self.refresh_oldest_pending_gauge(labels, now_ms);
775 }
776
777 pub fn set_trigger_inflight(&self, trigger_id: &str, count: u64) {
778 self.set_gauge(
779 "harn_trigger_inflight",
780 labels([("trigger_id", trigger_id)]),
781 count as f64,
782 );
783 }
784
785 pub fn set_trigger_budget_cost_today(&self, trigger_id: &str, cost_usd: f64) {
786 self.set_gauge(
787 "harn_trigger_budget_cost_today_usd",
788 labels([("trigger_id", trigger_id)]),
789 cost_usd.max(0.0),
790 );
791 }
792
793 pub fn record_trigger_budget_exhausted(&self, trigger_id: &str, strategy: &str) {
794 self.increment_counter(
795 "harn_trigger_budget_exhausted_total",
796 labels([("trigger_id", trigger_id), ("strategy", strategy)]),
797 1,
798 );
799 }
800
801 pub fn record_backpressure_event(&self, dimension: &str, action: &str) {
802 self.increment_counter(
803 "harn_backpressure_events_total",
804 labels([("dimension", dimension), ("action", action)]),
805 1,
806 );
807 }
808
809 pub fn record_event_log_append(
810 &self,
811 topic: &str,
812 duration: StdDuration,
813 payload_bytes: usize,
814 ) {
815 self.observe_histogram(
816 "harn_event_log_append_duration_seconds",
817 labels([("topic", topic)]),
818 duration.as_secs_f64(),
819 &Self::DURATION_BUCKETS,
820 );
821 self.set_gauge(
822 "harn_event_log_topic_size_bytes",
823 labels([("topic", topic)]),
824 payload_bytes as f64,
825 );
826 }
827
828 pub fn set_event_log_consumer_lag(&self, topic: &str, consumer: &str, lag: u64) {
829 self.set_gauge(
830 "harn_event_log_consumer_lag",
831 labels([("topic", topic), ("consumer", consumer)]),
832 lag as f64,
833 );
834 }
835
836 pub fn record_a2a_hop(&self, target: &str, outcome: &str, duration: StdDuration) {
837 self.increment_counter(
838 "harn_a2a_hops_total",
839 labels([("target", target), ("outcome", outcome)]),
840 1,
841 );
842 self.observe_histogram(
843 "harn_a2a_hop_duration_seconds",
844 labels([("target", target)]),
845 duration.as_secs_f64(),
846 &Self::DURATION_BUCKETS,
847 );
848 }
849
850 pub fn set_worker_queue_depth(&self, queue: &str, depth: u64) {
851 self.set_gauge(
852 "harn_worker_queue_depth",
853 labels([("queue", queue)]),
854 depth as f64,
855 );
856 }
857
858 pub fn record_worker_queue_claim_age(&self, queue: &str, age_seconds: f64) {
859 self.observe_histogram(
860 "harn_worker_queue_claim_age_seconds",
861 labels([("queue", queue)]),
862 age_seconds.max(0.0),
863 &Self::DURATION_BUCKETS,
864 );
865 }
866
867 pub fn record_scheduler_selection(
869 &self,
870 queue: &str,
871 fairness_dimension: &str,
872 fairness_key: &str,
873 ) {
874 self.increment_counter(
875 "harn_scheduler_selections_total",
876 labels([
877 ("queue", queue),
878 ("fairness_dimension", fairness_dimension),
879 ("fairness_key", fairness_key),
880 ]),
881 1,
882 );
883 }
884
885 pub fn record_scheduler_deferral(
888 &self,
889 queue: &str,
890 fairness_dimension: &str,
891 fairness_key: &str,
892 ) {
893 self.increment_counter(
894 "harn_scheduler_deferrals_total",
895 labels([
896 ("queue", queue),
897 ("fairness_dimension", fairness_dimension),
898 ("fairness_key", fairness_key),
899 ]),
900 1,
901 );
902 }
903
904 pub fn record_scheduler_starvation_promotion(
906 &self,
907 queue: &str,
908 fairness_dimension: &str,
909 fairness_key: &str,
910 ) {
911 self.increment_counter(
912 "harn_scheduler_starvation_promotions_total",
913 labels([
914 ("queue", queue),
915 ("fairness_dimension", fairness_dimension),
916 ("fairness_key", fairness_key),
917 ]),
918 1,
919 );
920 }
921
922 pub fn set_scheduler_deficit(
924 &self,
925 queue: &str,
926 fairness_dimension: &str,
927 fairness_key: &str,
928 deficit: i64,
929 ) {
930 self.set_gauge(
931 "harn_scheduler_deficit",
932 labels([
933 ("queue", queue),
934 ("fairness_dimension", fairness_dimension),
935 ("fairness_key", fairness_key),
936 ]),
937 deficit as f64,
938 );
939 }
940
941 pub fn set_scheduler_oldest_eligible_age(
943 &self,
944 queue: &str,
945 fairness_dimension: &str,
946 fairness_key: &str,
947 age_ms: u64,
948 ) {
949 self.set_gauge(
950 "harn_scheduler_oldest_eligible_age_seconds",
951 labels([
952 ("queue", queue),
953 ("fairness_dimension", fairness_dimension),
954 ("fairness_key", fairness_key),
955 ]),
956 age_ms as f64 / 1000.0,
957 );
958 }
959
960 pub fn set_orchestrator_pump_backlog(&self, topic: &str, count: u64) {
961 self.set_gauge(
962 "harn_orchestrator_pump_backlog",
963 labels([("topic", topic)]),
964 count as f64,
965 );
966 }
967
968 pub fn set_orchestrator_pump_outstanding(&self, topic: &str, count: usize) {
969 self.set_gauge(
970 "harn_orchestrator_pump_outstanding",
971 labels([("topic", topic)]),
972 count as f64,
973 );
974 }
975
976 pub fn record_orchestrator_pump_admission_delay(&self, topic: &str, duration: StdDuration) {
977 self.observe_histogram(
978 "harn_orchestrator_pump_admission_delay_seconds",
979 labels([("topic", topic)]),
980 duration.as_secs_f64(),
981 &Self::DURATION_BUCKETS,
982 );
983 }
984
985 pub fn record_llm_call(&self, provider: &str, model: &str, outcome: &str, cost_usd: f64) {
986 self.increment_counter(
987 "harn_llm_calls_total",
988 labels([
989 ("provider", provider),
990 ("model", model),
991 ("outcome", outcome),
992 ]),
993 1,
994 );
995 if cost_usd > 0.0 {
996 self.increment_counter(
997 "harn_llm_cost_usd_total",
998 labels([("provider", provider), ("model", model)]),
999 cost_usd,
1000 );
1001 } else {
1002 self.ensure_counter(
1003 "harn_llm_cost_usd_total",
1004 labels([("provider", provider), ("model", model)]),
1005 );
1006 }
1007 }
1008
1009 pub fn record_llm_cache_hit(&self, provider: &str) {
1010 self.increment_counter(
1011 "harn_llm_cache_hits_total",
1012 labels([("provider", provider)]),
1013 1,
1014 );
1015 }
1016
1017 pub fn record_schema_stream_aborted(&self, provider: &str, model: &str) {
1023 self.increment_counter(
1024 "harn_llm_schema_stream_aborted_total",
1025 labels([("provider", provider), ("model", model)]),
1026 1,
1027 );
1028 }
1029
1030 pub fn render_prometheus(&self) -> String {
1031 let snapshot = self.snapshot();
1032 let counters = [
1033 (
1034 "connector_linear_timestamp_rejections_total",
1035 snapshot.linear_timestamp_rejections_total,
1036 ),
1037 (
1038 "dispatch_succeeded_total",
1039 snapshot.dispatch_succeeded_total,
1040 ),
1041 ("dispatch_failed_total", snapshot.dispatch_failed_total),
1042 ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
1043 ("retry_scheduled_total", snapshot.retry_scheduled_total),
1044 (
1045 "slack_events_delivery_success_total",
1046 snapshot.slack_delivery_success_total,
1047 ),
1048 (
1049 "slack_events_delivery_failure_total",
1050 snapshot.slack_delivery_failure_total,
1051 ),
1052 ];
1053
1054 let mut rendered = String::new();
1055 for (name, value) in counters {
1056 rendered.push_str("# TYPE ");
1057 rendered.push_str(name);
1058 rendered.push_str(" counter\n");
1059 rendered.push_str(name);
1060 rendered.push(' ');
1061 rendered.push_str(&value.to_string());
1062 rendered.push('\n');
1063 }
1064 let custom_counters = self
1065 .custom_counters
1066 .lock()
1067 .expect("custom counters poisoned");
1068 for (name, value) in custom_counters.iter() {
1069 let metric_name = format!(
1070 "connector_custom_{}_total",
1071 name.chars()
1072 .map(|ch| if ch.is_ascii_alphanumeric() || ch == '_' {
1073 ch
1074 } else {
1075 '_'
1076 })
1077 .collect::<String>()
1078 );
1079 rendered.push_str("# TYPE ");
1080 rendered.push_str(&metric_name);
1081 rendered.push_str(" counter\n");
1082 rendered.push_str(&metric_name);
1083 rendered.push(' ');
1084 rendered.push_str(&value.to_string());
1085 rendered.push('\n');
1086 }
1087 rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
1088 rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
1089 rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
1090 rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
1091 self.render_generic_metrics(&mut rendered);
1092 rendered
1093 }
1094
1095 fn increment_counter(&self, name: &str, labels: MetricLabels, amount: impl Into<f64>) {
1096 let amount = amount.into();
1097 if amount <= 0.0 || !amount.is_finite() {
1098 return;
1099 }
1100 let mut counters = self.counters.lock().expect("metrics counters poisoned");
1101 *counters.entry((name.to_string(), labels)).or_default() += amount;
1102 }
1103
1104 fn ensure_counter(&self, name: &str, labels: MetricLabels) {
1105 let mut counters = self.counters.lock().expect("metrics counters poisoned");
1106 counters.entry((name.to_string(), labels)).or_default();
1107 }
1108
1109 fn set_gauge(&self, name: &str, labels: MetricLabels, value: f64) {
1110 let mut gauges = self.gauges.lock().expect("metrics gauges poisoned");
1111 gauges.insert((name.to_string(), labels), value);
1112 }
1113
1114 fn observe_histogram(
1115 &self,
1116 name: &str,
1117 labels: MetricLabels,
1118 value: f64,
1119 bucket_bounds: &[f64],
1120 ) {
1121 if !value.is_finite() {
1122 return;
1123 }
1124 let mut histograms = self.histograms.lock().expect("metrics histograms poisoned");
1125 let histogram = histograms
1126 .entry((name.to_string(), labels))
1127 .or_insert_with(|| HistogramMetric {
1128 buckets: bucket_bounds
1129 .iter()
1130 .map(|bound| (prometheus_float(*bound), 0))
1131 .chain(std::iter::once(("+Inf".to_string(), 0)))
1132 .collect(),
1133 count: 0,
1134 sum: 0.0,
1135 });
1136 histogram.count += 1;
1137 histogram.sum += value;
1138 for bound in bucket_bounds {
1139 if value <= *bound {
1140 let key = prometheus_float(*bound);
1141 *histogram.buckets.entry(key).or_default() += 1;
1142 }
1143 }
1144 *histogram.buckets.entry("+Inf".to_string()).or_default() += 1;
1145 }
1146
1147 fn refresh_oldest_pending_gauge(&self, labels: MetricLabels, now_ms: i64) {
1148 let oldest_accepted_at_ms = self
1149 .pending_trigger_events
1150 .lock()
1151 .expect("pending trigger events poisoned")
1152 .get(&labels)
1153 .and_then(|events| events.values().min().copied());
1154 let age_seconds = oldest_accepted_at_ms
1155 .map(|accepted_at_ms| millis_delta(now_ms, accepted_at_ms).as_secs_f64())
1156 .unwrap_or(0.0);
1157 self.set_gauge(
1158 "harn_trigger_oldest_pending_age_seconds",
1159 labels,
1160 age_seconds,
1161 );
1162 }
1163
1164 fn render_generic_metrics(&self, rendered: &mut String) {
1165 let counters = self
1166 .counters
1167 .lock()
1168 .expect("metrics counters poisoned")
1169 .clone();
1170 let gauges = self.gauges.lock().expect("metrics gauges poisoned").clone();
1171 let histograms = self
1172 .histograms
1173 .lock()
1174 .expect("metrics histograms poisoned")
1175 .clone();
1176
1177 for name in metric_family_names(MetricKind::Counter) {
1178 rendered.push_str("# TYPE ");
1179 rendered.push_str(name);
1180 rendered.push_str(" counter\n");
1181 for ((sample_name, labels), value) in counters.iter().filter(|((n, _), _)| n == name) {
1182 render_sample(rendered, sample_name, labels, *value);
1183 }
1184 }
1185 for name in metric_family_names(MetricKind::Gauge) {
1186 rendered.push_str("# TYPE ");
1187 rendered.push_str(name);
1188 rendered.push_str(" gauge\n");
1189 for ((sample_name, labels), value) in gauges.iter().filter(|((n, _), _)| n == name) {
1190 render_sample(rendered, sample_name, labels, *value);
1191 }
1192 }
1193 for name in metric_family_names(MetricKind::Histogram) {
1194 rendered.push_str("# TYPE ");
1195 rendered.push_str(name);
1196 rendered.push_str(" histogram\n");
1197 for ((sample_name, labels), histogram) in
1198 histograms.iter().filter(|((n, _), _)| n == name)
1199 {
1200 for (le, value) in &histogram.buckets {
1201 let mut bucket_labels = labels.clone();
1202 bucket_labels.insert("le".to_string(), le.clone());
1203 render_sample(
1204 rendered,
1205 &format!("{sample_name}_bucket"),
1206 &bucket_labels,
1207 *value as f64,
1208 );
1209 }
1210 render_sample(
1211 rendered,
1212 &format!("{sample_name}_sum"),
1213 labels,
1214 histogram.sum,
1215 );
1216 render_sample(
1217 rendered,
1218 &format!("{sample_name}_count"),
1219 labels,
1220 histogram.count as f64,
1221 );
1222 }
1223 }
1224 }
1225}
1226
1227#[derive(Clone, Copy)]
1228enum MetricKind {
1229 Counter,
1230 Gauge,
1231 Histogram,
1232}
1233
1234fn metric_family_names(kind: MetricKind) -> &'static [&'static str] {
1235 match kind {
1236 MetricKind::Counter => &[
1237 "harn_http_requests_total",
1238 "harn_trigger_received_total",
1239 "harn_trigger_deduped_total",
1240 "harn_trigger_predicate_evaluations_total",
1241 "harn_trigger_dispatched_total",
1242 "harn_trigger_retries_total",
1243 "harn_trigger_dlq_total",
1244 "harn_trigger_budget_exhausted_total",
1245 "harn_backpressure_events_total",
1246 "harn_a2a_hops_total",
1247 "harn_llm_calls_total",
1248 "harn_llm_cost_usd_total",
1249 "harn_llm_cache_hits_total",
1250 "harn_llm_schema_stream_aborted_total",
1251 "harn_scheduler_selections_total",
1252 "harn_scheduler_deferrals_total",
1253 "harn_scheduler_starvation_promotions_total",
1254 ],
1255 MetricKind::Gauge => &[
1256 "harn_trigger_inflight",
1257 "harn_event_log_topic_size_bytes",
1258 "harn_event_log_consumer_lag",
1259 "harn_trigger_budget_cost_today_usd",
1260 "harn_worker_queue_depth",
1261 "harn_orchestrator_pump_backlog",
1262 "harn_orchestrator_pump_outstanding",
1263 "harn_trigger_oldest_pending_age_seconds",
1264 "harn_scheduler_deficit",
1265 "harn_scheduler_oldest_eligible_age_seconds",
1266 ],
1267 MetricKind::Histogram => &[
1268 "harn_http_request_duration_seconds",
1269 "harn_http_body_size_bytes",
1270 "harn_trigger_predicate_cost_usd",
1271 "harn_event_log_append_duration_seconds",
1272 "harn_a2a_hop_duration_seconds",
1273 "harn_worker_queue_claim_age_seconds",
1274 "harn_orchestrator_pump_admission_delay_seconds",
1275 "harn_trigger_webhook_accepted_to_normalized_seconds",
1276 "harn_trigger_webhook_accepted_to_queue_append_seconds",
1277 "harn_trigger_queue_age_at_dispatch_admission_seconds",
1278 "harn_trigger_queue_age_at_dispatch_start_seconds",
1279 "harn_trigger_dispatch_runtime_seconds",
1280 "harn_trigger_retry_delay_seconds",
1281 "harn_trigger_accepted_to_dlq_seconds",
1282 ],
1283 }
1284}
1285
1286fn labels<const N: usize>(pairs: [(&str, &str); N]) -> MetricLabels {
1287 pairs
1288 .into_iter()
1289 .map(|(name, value)| (name.to_string(), value.to_string()))
1290 .collect()
1291}
1292
1293fn trigger_lifecycle_labels(
1294 trigger_id: &str,
1295 binding_key: &str,
1296 provider: &str,
1297 tenant_id: Option<&str>,
1298 status: &str,
1299) -> MetricLabels {
1300 labels([
1301 ("binding_key", binding_key),
1302 ("provider", provider),
1303 ("status", status),
1304 ("tenant_id", tenant_label(tenant_id)),
1305 ("trigger_id", trigger_id),
1306 ])
1307}
1308
1309fn trigger_pending_labels(
1310 trigger_id: &str,
1311 binding_key: &str,
1312 provider: &str,
1313 tenant_id: Option<&str>,
1314) -> MetricLabels {
1315 labels([
1316 ("binding_key", binding_key),
1317 ("provider", provider),
1318 ("tenant_id", tenant_label(tenant_id)),
1319 ("trigger_id", trigger_id),
1320 ])
1321}
1322
1323fn tenant_label(tenant_id: Option<&str>) -> &str {
1324 tenant_id
1325 .map(str::trim)
1326 .filter(|value| !value.is_empty())
1327 .unwrap_or("none")
1328}
1329
1330fn millis_delta(later_ms: i64, earlier_ms: i64) -> StdDuration {
1331 StdDuration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
1332}
1333
1334fn render_sample(rendered: &mut String, name: &str, labels: &MetricLabels, value: f64) {
1335 rendered.push_str(name);
1336 if !labels.is_empty() {
1337 rendered.push('{');
1338 for (index, (label, label_value)) in labels.iter().enumerate() {
1339 if index > 0 {
1340 rendered.push(',');
1341 }
1342 rendered.push_str(label);
1343 rendered.push_str("=\"");
1344 rendered.push_str(&escape_label_value(label_value));
1345 rendered.push('"');
1346 }
1347 rendered.push('}');
1348 }
1349 rendered.push(' ');
1350 rendered.push_str(&prometheus_float(value));
1351 rendered.push('\n');
1352}
1353
1354fn escape_label_value(value: &str) -> String {
1355 value
1356 .chars()
1357 .flat_map(|ch| match ch {
1358 '\\' => "\\\\".chars().collect::<Vec<_>>(),
1359 '"' => "\\\"".chars().collect::<Vec<_>>(),
1360 '\n' => "\\n".chars().collect::<Vec<_>>(),
1361 other => vec![other],
1362 })
1363 .collect()
1364}
1365
1366fn prometheus_float(value: f64) -> String {
1367 if value.is_infinite() && value.is_sign_positive() {
1368 return "+Inf".to_string();
1369 }
1370 if value.fract() == 0.0 {
1371 format!("{value:.0}")
1372 } else {
1373 let rendered = format!("{value:.6}");
1374 rendered
1375 .trim_end_matches('0')
1376 .trim_end_matches('.')
1377 .to_string()
1378 }
1379}
1380
1381#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1383pub struct ProviderPayloadSchema {
1384 pub harn_schema_name: String,
1385 #[serde(default)]
1386 pub json_schema: JsonValue,
1387}
1388
1389impl ProviderPayloadSchema {
1390 pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
1391 Self {
1392 harn_schema_name: harn_schema_name.into(),
1393 json_schema,
1394 }
1395 }
1396
1397 pub fn named(harn_schema_name: impl Into<String>) -> Self {
1398 Self::new(harn_schema_name, JsonValue::Null)
1399 }
1400}
1401
1402impl Default for ProviderPayloadSchema {
1403 fn default() -> Self {
1404 Self::named("raw")
1405 }
1406}
1407
1408#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1410#[serde(transparent)]
1411pub struct TriggerKind(String);
1412
1413impl TriggerKind {
1414 pub fn new(value: impl Into<String>) -> Self {
1415 Self(value.into())
1416 }
1417
1418 pub fn as_str(&self) -> &str {
1419 self.0.as_str()
1420 }
1421}
1422
1423impl From<&str> for TriggerKind {
1424 fn from(value: &str) -> Self {
1425 Self::new(value)
1426 }
1427}
1428
1429impl From<String> for TriggerKind {
1430 fn from(value: String) -> Self {
1431 Self::new(value)
1432 }
1433}
1434
1435#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1437pub struct TriggerBinding {
1438 pub provider: ProviderId,
1439 pub kind: TriggerKind,
1440 pub binding_id: String,
1441 #[serde(default)]
1442 pub dedupe_key: Option<String>,
1443 #[serde(default = "default_dedupe_retention_days")]
1444 pub dedupe_retention_days: u32,
1445 #[serde(default)]
1446 pub config: JsonValue,
1447}
1448
1449impl TriggerBinding {
1450 pub fn new(
1451 provider: ProviderId,
1452 kind: impl Into<TriggerKind>,
1453 binding_id: impl Into<String>,
1454 ) -> Self {
1455 Self {
1456 provider,
1457 kind: kind.into(),
1458 binding_id: binding_id.into(),
1459 dedupe_key: None,
1460 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1461 config: JsonValue::Null,
1462 }
1463 }
1464}
1465
1466fn default_dedupe_retention_days() -> u32 {
1467 crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
1468}
1469
1470#[derive(Clone, Debug, Default)]
1472pub struct TriggerRegistry {
1473 bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
1474}
1475
1476impl TriggerRegistry {
1477 pub fn register(&mut self, binding: TriggerBinding) {
1478 self.bindings
1479 .entry(binding.provider.clone())
1480 .or_default()
1481 .push(binding);
1482 }
1483
1484 pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
1485 &self.bindings
1486 }
1487
1488 pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
1489 self.bindings
1490 .get(provider)
1491 .map(Vec::as_slice)
1492 .unwrap_or(&[])
1493 }
1494}
1495
1496#[derive(Clone, Debug, PartialEq, Eq)]
1498pub struct ActivationHandle {
1499 pub provider: ProviderId,
1500 pub binding_count: usize,
1501}
1502
1503impl ActivationHandle {
1504 pub fn new(provider: ProviderId, binding_count: usize) -> Self {
1505 Self {
1506 provider,
1507 binding_count,
1508 }
1509 }
1510}
1511
1512#[derive(Clone, Debug, PartialEq)]
1514pub struct RawInbound {
1515 pub kind: String,
1516 pub headers: BTreeMap<String, String>,
1517 pub query: BTreeMap<String, String>,
1518 pub body: Vec<u8>,
1519 pub received_at: OffsetDateTime,
1520 pub occurred_at: Option<OffsetDateTime>,
1521 pub tenant_id: Option<TenantId>,
1522 pub metadata: JsonValue,
1523}
1524
1525impl RawInbound {
1526 pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
1527 Self {
1528 kind: kind.into(),
1529 headers,
1530 query: BTreeMap::new(),
1531 body,
1532 received_at: clock::now_utc(),
1533 occurred_at: None,
1534 tenant_id: None,
1535 metadata: JsonValue::Null,
1536 }
1537 }
1538
1539 pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
1540 Ok(serde_json::from_slice(&self.body)?)
1541 }
1542}
1543
1544#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1546pub struct RateLimitConfig {
1547 pub capacity: u32,
1548 pub refill_tokens: u32,
1549 pub refill_interval: StdDuration,
1550}
1551
1552impl Default for RateLimitConfig {
1553 fn default() -> Self {
1554 Self {
1555 capacity: 60,
1556 refill_tokens: 1,
1557 refill_interval: StdDuration::from_secs(1),
1558 }
1559 }
1560}
1561
1562#[derive(Clone, Debug)]
1563struct TokenBucket {
1564 tokens: f64,
1565 last_refill: ClockInstant,
1566}
1567
1568impl TokenBucket {
1569 fn full(config: RateLimitConfig) -> Self {
1570 Self {
1571 tokens: config.capacity as f64,
1572 last_refill: clock::instant_now(),
1573 }
1574 }
1575
1576 fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
1577 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1578 let rate = config.refill_tokens.max(1) as f64 / interval;
1579 let elapsed = now.duration_since(self.last_refill).as_secs_f64();
1580 self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
1581 self.last_refill = now;
1582 }
1583
1584 fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
1585 self.refill(config, now);
1586 if self.tokens >= 1.0 {
1587 self.tokens -= 1.0;
1588 true
1589 } else {
1590 false
1591 }
1592 }
1593
1594 fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
1595 if self.tokens >= 1.0 {
1596 return StdDuration::ZERO;
1597 }
1598 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1599 let rate = config.refill_tokens.max(1) as f64 / interval;
1600 let missing = (1.0 - self.tokens).max(0.0);
1601 StdDuration::from_secs_f64((missing / rate).max(0.001))
1602 }
1603}
1604
1605#[derive(Debug)]
1607pub struct RateLimiterFactory {
1608 config: RateLimitConfig,
1609 buckets: Mutex<HashMap<(String, String), TokenBucket>>,
1610}
1611
1612impl RateLimiterFactory {
1613 pub fn new(config: RateLimitConfig) -> Self {
1614 Self {
1615 config,
1616 buckets: Mutex::new(HashMap::new()),
1617 }
1618 }
1619
1620 pub fn config(&self) -> RateLimitConfig {
1621 self.config
1622 }
1623
1624 pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
1625 ScopedRateLimiter {
1626 factory: self,
1627 provider: provider.clone(),
1628 key: key.into(),
1629 }
1630 }
1631
1632 pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
1633 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1634 let bucket = buckets
1635 .entry((provider.as_str().to_string(), key.to_string()))
1636 .or_insert_with(|| TokenBucket::full(self.config));
1637 bucket.try_acquire(self.config, clock::instant_now())
1638 }
1639
1640 pub async fn acquire(&self, provider: &ProviderId, key: &str) {
1641 loop {
1642 let wait = {
1643 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1644 let bucket = buckets
1645 .entry((provider.as_str().to_string(), key.to_string()))
1646 .or_insert_with(|| TokenBucket::full(self.config));
1647 if bucket.try_acquire(self.config, clock::instant_now()) {
1648 return;
1649 }
1650 bucket.wait_duration(self.config)
1651 };
1652 clock::sleep(wait).await;
1657 }
1658 }
1659}
1660
1661impl Default for RateLimiterFactory {
1662 fn default() -> Self {
1663 Self::new(RateLimitConfig::default())
1664 }
1665}
1666
1667#[derive(Clone, Debug)]
1669pub struct ScopedRateLimiter<'a> {
1670 factory: &'a RateLimiterFactory,
1671 provider: ProviderId,
1672 key: String,
1673}
1674
1675impl<'a> ScopedRateLimiter<'a> {
1676 pub fn try_acquire(&self) -> bool {
1677 self.factory.try_acquire(&self.provider, &self.key)
1678 }
1679
1680 pub async fn acquire(&self) {
1681 self.factory.acquire(&self.provider, &self.key).await;
1682 }
1683}
1684
1685pub struct ConnectorRegistry {
1687 connectors: BTreeMap<ProviderId, ConnectorHandle>,
1688}
1689
1690impl ConnectorRegistry {
1691 pub fn empty() -> Self {
1692 Self {
1693 connectors: BTreeMap::new(),
1694 }
1695 }
1696
1697 pub fn with_defaults() -> Self {
1698 let mut registry = Self::empty();
1699 for provider in registered_provider_metadata() {
1700 if !matches!(provider.runtime, ProviderRuntimeMetadata::Builtin { .. }) {
1701 continue;
1702 }
1703 registry
1704 .register(default_connector_for_provider(&provider))
1705 .expect("default connector registration should not fail");
1706 }
1707 registry
1708 }
1709
1710 pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
1711 let provider = connector.provider_id().clone();
1712 if self.connectors.contains_key(&provider) {
1713 return Err(ConnectorError::DuplicateProvider(provider.0));
1714 }
1715 self.connectors
1716 .insert(provider, Arc::new(AsyncMutex::new(connector)));
1717 Ok(())
1718 }
1719
1720 pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
1721 self.connectors.get(id).cloned()
1722 }
1723
1724 pub fn remove(&mut self, id: &ProviderId) -> Option<ConnectorHandle> {
1725 self.connectors.remove(id)
1726 }
1727
1728 pub fn list(&self) -> Vec<ProviderId> {
1729 self.connectors.keys().cloned().collect()
1730 }
1731
1732 pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1733 for connector in self.connectors.values() {
1734 connector.lock().await.init(ctx.clone()).await?;
1735 }
1736 Ok(())
1737 }
1738
1739 pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
1740 let mut clients = BTreeMap::new();
1741 for (provider, connector) in &self.connectors {
1742 let client = connector.lock().await.client();
1743 clients.insert(provider.clone(), client);
1744 }
1745 clients
1746 }
1747
1748 pub async fn activate_all(
1749 &self,
1750 registry: &TriggerRegistry,
1751 ) -> Result<Vec<ActivationHandle>, ConnectorError> {
1752 let mut handles = Vec::new();
1753 for (provider, connector) in &self.connectors {
1754 let bindings = registry.bindings_for(provider);
1755 if bindings.is_empty() {
1756 continue;
1757 }
1758 let connector = connector.lock().await;
1759 handles.push(connector.activate(bindings).await?);
1760 }
1761 Ok(handles)
1762 }
1763}
1764
1765impl Default for ConnectorRegistry {
1766 fn default() -> Self {
1767 Self::with_defaults()
1768 }
1769}
1770
1771fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
1772 match &provider.runtime {
1773 ProviderRuntimeMetadata::Builtin {
1774 connector,
1775 default_signature_variant,
1776 } => match connector.as_str() {
1777 "a2a-push" => Box::new(A2aPushConnector::new()),
1778 "cron" => Box::new(CronConnector::new()),
1779 "stream" => Box::new(StreamConnector::new(
1780 ProviderId::from(provider.provider.clone()),
1781 provider.schema_name.clone(),
1782 )),
1783 "webhook" => {
1784 let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
1785 .expect("catalog webhook signature variant must be valid");
1786 Box::new(GenericWebhookConnector::with_profile(
1787 WebhookProviderProfile::new(
1788 ProviderId::from(provider.provider.clone()),
1789 provider.schema_name.clone(),
1790 variant,
1791 ),
1792 ))
1793 }
1794 _ => Box::new(PlaceholderConnector::from_metadata(provider)),
1795 },
1796 ProviderRuntimeMetadata::Placeholder => {
1797 Box::new(PlaceholderConnector::from_metadata(provider))
1798 }
1799 }
1800}
1801
1802struct PlaceholderConnector {
1803 provider_id: ProviderId,
1804 kinds: Vec<TriggerKind>,
1805 schema_name: String,
1806}
1807
1808impl PlaceholderConnector {
1809 fn from_metadata(metadata: &ProviderMetadata) -> Self {
1810 Self {
1811 provider_id: ProviderId::from(metadata.provider.clone()),
1812 kinds: metadata
1813 .kinds
1814 .iter()
1815 .cloned()
1816 .map(TriggerKind::from)
1817 .collect(),
1818 schema_name: metadata.schema_name.clone(),
1819 }
1820 }
1821}
1822
1823struct PlaceholderClient;
1824
1825#[async_trait]
1826impl ConnectorClient for PlaceholderClient {
1827 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1828 Err(ClientError::Other(format!(
1829 "connector client method '{method}' is not implemented for this provider"
1830 )))
1831 }
1832}
1833
1834#[async_trait]
1835impl Connector for PlaceholderConnector {
1836 fn provider_id(&self) -> &ProviderId {
1837 &self.provider_id
1838 }
1839
1840 fn kinds(&self) -> &[TriggerKind] {
1841 &self.kinds
1842 }
1843
1844 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1845 Ok(())
1846 }
1847
1848 async fn activate(
1849 &self,
1850 bindings: &[TriggerBinding],
1851 ) -> Result<ActivationHandle, ConnectorError> {
1852 Ok(ActivationHandle::new(
1853 self.provider_id.clone(),
1854 bindings.len(),
1855 ))
1856 }
1857
1858 async fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
1859 Err(ConnectorError::Unsupported(format!(
1860 "provider '{}' is cataloged but does not have a concrete inbound connector yet",
1861 self.provider_id.as_str()
1862 )))
1863 }
1864
1865 fn payload_schema(&self) -> ProviderPayloadSchema {
1866 ProviderPayloadSchema::named(self.schema_name.clone())
1867 }
1868
1869 fn client(&self) -> Arc<dyn ConnectorClient> {
1870 Arc::new(PlaceholderClient)
1871 }
1872}
1873
1874pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
1875 ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
1876 *slot.borrow_mut() = clients
1877 .into_iter()
1878 .map(|(provider, client)| (provider.as_str().to_string(), client))
1879 .collect();
1880 });
1881}
1882
1883pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
1884 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
1885}
1886
1887pub fn clear_active_connector_clients() {
1888 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
1889}
1890
1891#[cfg(test)]
1892mod tests {
1893 use super::*;
1894
1895 use std::sync::atomic::{AtomicUsize, Ordering};
1896
1897 use async_trait::async_trait;
1898 use serde_json::json;
1899
1900 struct NoopClient;
1901
1902 #[async_trait]
1903 impl ConnectorClient for NoopClient {
1904 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1905 Ok(json!({ "method": method }))
1906 }
1907 }
1908
1909 struct FakeConnector {
1910 provider_id: ProviderId,
1911 kinds: Vec<TriggerKind>,
1912 activate_calls: Arc<AtomicUsize>,
1913 }
1914
1915 impl FakeConnector {
1916 fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
1917 Self {
1918 provider_id: ProviderId::from(provider_id),
1919 kinds: vec![TriggerKind::from("webhook")],
1920 activate_calls,
1921 }
1922 }
1923 }
1924
1925 #[async_trait]
1926 impl Connector for FakeConnector {
1927 fn provider_id(&self) -> &ProviderId {
1928 &self.provider_id
1929 }
1930
1931 fn kinds(&self) -> &[TriggerKind] {
1932 &self.kinds
1933 }
1934
1935 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1936 Ok(())
1937 }
1938
1939 async fn activate(
1940 &self,
1941 bindings: &[TriggerBinding],
1942 ) -> Result<ActivationHandle, ConnectorError> {
1943 self.activate_calls.fetch_add(1, Ordering::SeqCst);
1944 Ok(ActivationHandle::new(
1945 self.provider_id.clone(),
1946 bindings.len(),
1947 ))
1948 }
1949
1950 async fn normalize_inbound(
1951 &self,
1952 _raw: RawInbound,
1953 ) -> Result<TriggerEvent, ConnectorError> {
1954 Err(ConnectorError::Unsupported(
1955 "not needed for registry tests".to_string(),
1956 ))
1957 }
1958
1959 fn payload_schema(&self) -> ProviderPayloadSchema {
1960 ProviderPayloadSchema::named("FakePayload")
1961 }
1962
1963 fn client(&self) -> Arc<dyn ConnectorClient> {
1964 Arc::new(NoopClient)
1965 }
1966 }
1967
1968 #[tokio::test]
1969 async fn connector_registry_rejects_duplicate_providers() {
1970 let activate_calls = Arc::new(AtomicUsize::new(0));
1971 let mut registry = ConnectorRegistry::empty();
1972 registry
1973 .register(Box::new(FakeConnector::new(
1974 "github",
1975 activate_calls.clone(),
1976 )))
1977 .unwrap();
1978
1979 let error = registry
1980 .register(Box::new(FakeConnector::new("github", activate_calls)))
1981 .unwrap_err();
1982 assert!(matches!(
1983 error,
1984 ConnectorError::DuplicateProvider(provider) if provider == "github"
1985 ));
1986 }
1987
1988 #[tokio::test]
1989 async fn connector_registry_activates_only_bound_connectors() {
1990 let github_calls = Arc::new(AtomicUsize::new(0));
1991 let slack_calls = Arc::new(AtomicUsize::new(0));
1992 let mut registry = ConnectorRegistry::empty();
1993 registry
1994 .register(Box::new(FakeConnector::new("github", github_calls.clone())))
1995 .unwrap();
1996 registry
1997 .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
1998 .unwrap();
1999
2000 let mut trigger_registry = TriggerRegistry::default();
2001 trigger_registry.register(TriggerBinding::new(
2002 ProviderId::from("github"),
2003 "webhook",
2004 "github.push",
2005 ));
2006 trigger_registry.register(TriggerBinding::new(
2007 ProviderId::from("github"),
2008 "webhook",
2009 "github.installation",
2010 ));
2011
2012 let handles = registry.activate_all(&trigger_registry).await.unwrap();
2013 assert_eq!(handles.len(), 1);
2014 assert_eq!(handles[0].provider.as_str(), "github");
2015 assert_eq!(handles[0].binding_count, 2);
2016 assert_eq!(github_calls.load(Ordering::SeqCst), 1);
2017 assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
2018 }
2019
2020 #[test]
2021 fn rate_limiter_scopes_tokens_by_provider_and_key() {
2022 let factory = RateLimiterFactory::new(RateLimitConfig {
2023 capacity: 1,
2024 refill_tokens: 1,
2025 refill_interval: StdDuration::from_secs(60),
2026 });
2027
2028 assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
2029 assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
2030 assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
2031 assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
2032 }
2033
2034 #[test]
2035 fn raw_inbound_json_body_preserves_raw_bytes() {
2036 let raw = RawInbound::new(
2037 "push",
2038 BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
2039 br#"{"ok":true}"#.to_vec(),
2040 );
2041
2042 assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
2043 }
2044
2045 #[test]
2046 fn connector_registry_lists_core_catalog_providers() {
2047 let registry = ConnectorRegistry::default();
2048 let providers = registry.list();
2049 assert!(providers.contains(&ProviderId::from("cron")));
2050 assert!(providers.contains(&ProviderId::from("webhook")));
2051 assert!(providers.contains(&ProviderId::from("kafka")));
2052 assert!(!providers.contains(&ProviderId::from("github")));
2053 assert!(!providers.contains(&ProviderId::from("slack")));
2054 }
2055
2056 #[test]
2057 fn pure_harn_pivot_only_keeps_core_builtin_connectors() {
2058 let core_runtime_providers = [
2059 "a2a-push",
2060 "cron",
2061 "email",
2062 "kafka",
2063 "nats",
2064 "postgres-cdc",
2065 "pulsar",
2066 "webhook",
2067 "websocket",
2068 ];
2069
2070 for provider in registered_provider_metadata() {
2071 if !matches!(provider.runtime, ProviderRuntimeMetadata::Builtin { .. }) {
2072 continue;
2073 }
2074
2075 let allowed_core = core_runtime_providers.contains(&provider.provider.as_str());
2076 assert!(
2077 allowed_core,
2078 "provider '{}' is registered as a Rust builtin connector; new service connectors \
2079 must ship as pure-Harn packages and register with connector = {{ harn = \"...\" }}",
2080 provider.provider
2081 );
2082 }
2083 }
2084
2085 #[test]
2086 fn metrics_registry_exports_orchestrator_metric_families() {
2087 let metrics = MetricsRegistry::default();
2088 metrics.record_http_request(
2089 "/triggers/github",
2090 "POST",
2091 200,
2092 StdDuration::from_millis(25),
2093 512,
2094 );
2095 metrics.record_trigger_received("github-new-issue", "github");
2096 metrics.record_trigger_deduped("github-new-issue", "inbox_duplicate");
2097 metrics.record_trigger_predicate_evaluation("github-new-issue", true, 0.002);
2098 metrics.record_trigger_dispatched("github-new-issue", "local", "succeeded");
2099 metrics.record_trigger_retry("github-new-issue", 2);
2100 metrics.record_trigger_dlq("github-new-issue", "retry_exhausted");
2101 metrics.set_trigger_inflight("github-new-issue", 0);
2102 metrics.record_event_log_append(
2103 "orchestrator.triggers.pending",
2104 StdDuration::from_millis(1),
2105 2048,
2106 );
2107 metrics.set_event_log_consumer_lag("orchestrator.triggers.pending", "orchestrator-pump", 0);
2108 metrics.set_trigger_budget_cost_today("github-new-issue", 0.002);
2109 metrics.record_trigger_budget_exhausted("github-new-issue", "daily_budget_exceeded");
2110 metrics.record_a2a_hop("agent.example", "succeeded", StdDuration::from_millis(10));
2111 metrics.set_worker_queue_depth("triage", 1);
2112 metrics.record_worker_queue_claim_age("triage", 3.0);
2113 metrics.set_orchestrator_pump_backlog("trigger.inbox.envelopes", 2);
2114 metrics.set_orchestrator_pump_outstanding("trigger.inbox.envelopes", 1);
2115 metrics.record_orchestrator_pump_admission_delay(
2116 "trigger.inbox.envelopes",
2117 StdDuration::from_millis(50),
2118 );
2119 metrics.record_trigger_accepted_to_normalized(
2120 "github-new-issue",
2121 "github-new-issue@v7",
2122 "github",
2123 Some("tenant-a"),
2124 "normalized",
2125 StdDuration::from_millis(25),
2126 );
2127 metrics.record_trigger_accepted_to_queue_append(
2128 "github-new-issue",
2129 "github-new-issue@v7",
2130 "github",
2131 Some("tenant-a"),
2132 "queued",
2133 StdDuration::from_millis(40),
2134 );
2135 metrics.record_trigger_queue_age_at_dispatch_admission(
2136 "github-new-issue",
2137 "github-new-issue@v7",
2138 "github",
2139 Some("tenant-a"),
2140 "admitted",
2141 StdDuration::from_millis(75),
2142 );
2143 metrics.record_trigger_queue_age_at_dispatch_start(
2144 "github-new-issue",
2145 "github-new-issue@v7",
2146 "github",
2147 Some("tenant-a"),
2148 "started",
2149 StdDuration::from_millis(125),
2150 );
2151 metrics.record_trigger_dispatch_runtime(
2152 "github-new-issue",
2153 "github-new-issue@v7",
2154 "github",
2155 Some("tenant-a"),
2156 "succeeded",
2157 StdDuration::from_millis(250),
2158 );
2159 metrics.record_trigger_retry_delay(
2160 "github-new-issue",
2161 "github-new-issue@v7",
2162 "github",
2163 Some("tenant-a"),
2164 "scheduled",
2165 StdDuration::from_secs(2),
2166 );
2167 metrics.record_trigger_accepted_to_dlq(
2168 "github-new-issue",
2169 "github-new-issue@v7",
2170 "github",
2171 Some("tenant-a"),
2172 "retry_exhausted",
2173 StdDuration::from_secs(45),
2174 );
2175 metrics.record_backpressure_event("ingest", "reject");
2176 metrics.note_trigger_pending_event(
2177 "evt-1",
2178 "github-new-issue",
2179 "github-new-issue@v7",
2180 "github",
2181 Some("tenant-a"),
2182 1_000,
2183 4_000,
2184 );
2185 metrics.record_llm_call("mock", "mock", "succeeded", 0.01);
2186 metrics.record_llm_cache_hit("mock");
2187
2188 let rendered = metrics.render_prometheus();
2189 for needle in [
2190 "harn_http_requests_total{endpoint=\"/triggers/github\",method=\"POST\",status=\"200\"} 1",
2191 "harn_http_request_duration_seconds_bucket{endpoint=\"/triggers/github\",le=\"0.05\"} 1",
2192 "harn_http_body_size_bytes_bucket{endpoint=\"/triggers/github\",le=\"512\"} 1",
2193 "harn_trigger_received_total{provider=\"github\",trigger_id=\"github-new-issue\"} 1",
2194 "harn_trigger_deduped_total{reason=\"inbox_duplicate\",trigger_id=\"github-new-issue\"} 1",
2195 "harn_trigger_predicate_evaluations_total{result=\"true\",trigger_id=\"github-new-issue\"} 1",
2196 "harn_trigger_predicate_cost_usd_bucket{le=\"0.01\",trigger_id=\"github-new-issue\"} 1",
2197 "harn_trigger_dispatched_total{handler_kind=\"local\",outcome=\"succeeded\",trigger_id=\"github-new-issue\"} 1",
2198 "harn_trigger_retries_total{attempt=\"2\",trigger_id=\"github-new-issue\"} 1",
2199 "harn_trigger_dlq_total{reason=\"retry_exhausted\",trigger_id=\"github-new-issue\"} 1",
2200 "harn_trigger_inflight{trigger_id=\"github-new-issue\"} 0",
2201 "harn_event_log_append_duration_seconds_bucket{le=\"0.005\",topic=\"orchestrator.triggers.pending\"} 1",
2202 "harn_event_log_topic_size_bytes{topic=\"orchestrator.triggers.pending\"} 2048",
2203 "harn_event_log_consumer_lag{consumer=\"orchestrator-pump\",topic=\"orchestrator.triggers.pending\"} 0",
2204 "harn_trigger_budget_cost_today_usd{trigger_id=\"github-new-issue\"} 0.002",
2205 "harn_trigger_budget_exhausted_total{strategy=\"daily_budget_exceeded\",trigger_id=\"github-new-issue\"} 1",
2206 "harn_backpressure_events_total{action=\"reject\",dimension=\"ingest\"} 1",
2207 "harn_a2a_hops_total{outcome=\"succeeded\",target=\"agent.example\"} 1",
2208 "harn_a2a_hop_duration_seconds_bucket{le=\"0.01\",target=\"agent.example\"} 1",
2209 "harn_worker_queue_depth{queue=\"triage\"} 1",
2210 "harn_worker_queue_claim_age_seconds_bucket{le=\"5\",queue=\"triage\"} 1",
2211 "harn_orchestrator_pump_backlog{topic=\"trigger.inbox.envelopes\"} 2",
2212 "harn_orchestrator_pump_outstanding{topic=\"trigger.inbox.envelopes\"} 1",
2213 "harn_orchestrator_pump_admission_delay_seconds_bucket{le=\"0.05\",topic=\"trigger.inbox.envelopes\"} 1",
2214 "harn_trigger_webhook_accepted_to_normalized_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.025\",provider=\"github\",status=\"normalized\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2215 "harn_trigger_webhook_accepted_to_queue_append_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.05\",provider=\"github\",status=\"queued\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2216 "harn_trigger_queue_age_at_dispatch_admission_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.1\",provider=\"github\",status=\"admitted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2217 "harn_trigger_queue_age_at_dispatch_start_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"started\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2218 "harn_trigger_dispatch_runtime_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"succeeded\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2219 "harn_trigger_retry_delay_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"2.5\",provider=\"github\",status=\"scheduled\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2220 "harn_trigger_accepted_to_dlq_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"60\",provider=\"github\",status=\"retry_exhausted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2221 "harn_trigger_oldest_pending_age_seconds{binding_key=\"github-new-issue@v7\",provider=\"github\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 3",
2222 "harn_llm_calls_total{model=\"mock\",outcome=\"succeeded\",provider=\"mock\"} 1",
2223 "harn_llm_cost_usd_total{model=\"mock\",provider=\"mock\"} 0.01",
2224 "harn_llm_cache_hits_total{provider=\"mock\"} 1",
2225 ] {
2226 assert!(rendered.contains(needle), "missing {needle}\n{rendered}");
2227 }
2228 }
2229}