1use std::cell::RefCell;
9use std::collections::{BTreeMap, HashMap};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex, OnceLock};
13use std::time::Duration as StdDuration;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value as JsonValue;
18use time::OffsetDateTime;
19use tokio::sync::Mutex as AsyncMutex;
20
21use crate::event_log::AnyEventLog;
22use crate::secrets::SecretProvider;
23use crate::triggers::test_util::clock::{self, ClockInstant};
24use crate::triggers::{
25 registered_provider_metadata, InboxIndex, ProviderId, ProviderMetadata,
26 ProviderRuntimeMetadata, TenantId, TriggerEvent,
27};
28
29pub mod a2a_push;
30pub mod cron;
31pub mod github;
32pub mod harn_module;
33pub mod hmac;
34pub mod linear;
35pub mod notion;
36pub mod slack;
37pub mod stream;
38#[cfg(test)]
39pub(crate) mod test_util;
40pub mod testkit;
41pub mod webhook;
42
43pub use a2a_push::A2aPushConnector;
44pub use cron::{CatchupMode, CronConnector};
45pub use github::GitHubConnector;
46pub use harn_module::{
47 load_contract as load_harn_connector_contract, HarnConnector, HarnConnectorContract,
48};
49pub use hmac::{
50 verify_hmac_authorization, HmacSignatureStyle, DEFAULT_CANONICAL_AUTHORIZATION_HEADER,
51 DEFAULT_CANONICAL_HMAC_SCHEME, DEFAULT_GITHUB_SIGNATURE_HEADER,
52 DEFAULT_LINEAR_SIGNATURE_HEADER, DEFAULT_NOTION_SIGNATURE_HEADER,
53 DEFAULT_SLACK_SIGNATURE_HEADER, DEFAULT_SLACK_TIMESTAMP_HEADER,
54 DEFAULT_STANDARD_WEBHOOKS_ID_HEADER, DEFAULT_STANDARD_WEBHOOKS_SIGNATURE_HEADER,
55 DEFAULT_STANDARD_WEBHOOKS_TIMESTAMP_HEADER, DEFAULT_STRIPE_SIGNATURE_HEADER,
56 SIGNATURE_VERIFY_AUDIT_TOPIC,
57};
58pub use linear::LinearConnector;
59pub use notion::{
60 load_pending_webhook_handshakes, NotionConnector, PersistedNotionWebhookHandshake,
61};
62pub use slack::SlackConnector;
63pub use stream::StreamConnector;
64use webhook::WebhookProviderProfile;
65pub use webhook::{GenericWebhookConnector, WebhookSignatureVariant};
66
67pub type ConnectorHandle = Arc<AsyncMutex<Box<dyn Connector>>>;
69
70thread_local! {
71 static ACTIVE_CONNECTOR_CLIENTS: RefCell<BTreeMap<String, Arc<dyn ConnectorClient>>> =
72 RefCell::new(BTreeMap::new());
73}
74
75#[async_trait]
77pub trait Connector: Send + Sync {
78 fn provider_id(&self) -> &ProviderId;
80
81 fn kinds(&self) -> &[TriggerKind];
83
84 async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError>;
86
87 async fn activate(
89 &self,
90 bindings: &[TriggerBinding],
91 ) -> Result<ActivationHandle, ConnectorError>;
92
93 async fn shutdown(&self, _deadline: StdDuration) -> Result<(), ConnectorError> {
95 Ok(())
96 }
97
98 async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError>;
100
101 async fn normalize_inbound_result(
104 &self,
105 raw: RawInbound,
106 ) -> Result<ConnectorNormalizeResult, ConnectorError> {
107 self.normalize_inbound(raw)
108 .await
109 .map(ConnectorNormalizeResult::event)
110 }
111
112 fn payload_schema(&self) -> ProviderPayloadSchema;
114
115 fn client(&self) -> Arc<dyn ConnectorClient>;
117}
118
119#[derive(Clone, Debug, PartialEq)]
121pub struct ConnectorHttpResponse {
122 pub status: u16,
123 pub headers: BTreeMap<String, String>,
124 pub body: JsonValue,
125}
126
127impl ConnectorHttpResponse {
128 pub fn new(status: u16, headers: BTreeMap<String, String>, body: JsonValue) -> Self {
129 Self {
130 status,
131 headers,
132 body,
133 }
134 }
135}
136
137#[derive(Clone, Debug, PartialEq)]
139pub enum ConnectorNormalizeResult {
140 Event(Box<TriggerEvent>),
141 Batch(Vec<TriggerEvent>),
142 ImmediateResponse {
143 response: ConnectorHttpResponse,
144 events: Vec<TriggerEvent>,
145 },
146 Reject(ConnectorHttpResponse),
147}
148
149impl ConnectorNormalizeResult {
150 pub fn event(event: TriggerEvent) -> Self {
151 Self::Event(Box::new(event))
152 }
153
154 pub fn into_events(self) -> Vec<TriggerEvent> {
155 match self {
156 Self::Event(event) => vec![*event],
157 Self::Batch(events) | Self::ImmediateResponse { events, .. } => events,
158 Self::Reject(_) => Vec::new(),
159 }
160 }
161}
162
163#[derive(Clone, Debug, PartialEq)]
164pub enum PostNormalizeOutcome {
165 Ready(Box<TriggerEvent>),
166 DuplicateDropped,
167}
168
169pub async fn postprocess_normalized_event(
170 inbox: &InboxIndex,
171 binding_id: &str,
172 dedupe_enabled: bool,
173 dedupe_ttl: StdDuration,
174 mut event: TriggerEvent,
175) -> Result<PostNormalizeOutcome, ConnectorError> {
176 if dedupe_enabled && !event.dedupe_claimed() {
177 if !inbox
178 .insert_if_new(binding_id, &event.dedupe_key, dedupe_ttl)
179 .await?
180 {
181 return Ok(PostNormalizeOutcome::DuplicateDropped);
182 }
183 event.mark_dedupe_claimed();
184 }
185
186 Ok(PostNormalizeOutcome::Ready(Box::new(event)))
187}
188
189#[async_trait]
191pub trait ConnectorClient: Send + Sync {
192 async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError>;
193}
194
195#[derive(Clone, Debug, PartialEq, Eq)]
197pub enum ClientError {
198 MethodNotFound(String),
199 InvalidArgs(String),
200 RateLimited(String),
201 Transport(String),
202 Other(String),
203}
204
205impl fmt::Display for ClientError {
206 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
207 match self {
208 Self::MethodNotFound(message)
209 | Self::InvalidArgs(message)
210 | Self::RateLimited(message)
211 | Self::Transport(message)
212 | Self::Other(message) => message.fmt(f),
213 }
214 }
215}
216
217impl std::error::Error for ClientError {}
218
219#[derive(Debug)]
221pub enum ConnectorError {
222 DuplicateProvider(String),
223 DuplicateDelivery(String),
224 UnknownProvider(String),
225 MissingHeader(String),
226 InvalidHeader {
227 name: String,
228 detail: String,
229 },
230 InvalidSignature(String),
231 TimestampOutOfWindow {
232 timestamp: OffsetDateTime,
233 now: OffsetDateTime,
234 window: time::Duration,
235 },
236 Json(String),
237 Secret(String),
238 EventLog(String),
239 HarnRuntime(String),
240 Client(ClientError),
241 Unsupported(String),
242 Activation(String),
243}
244
245impl ConnectorError {
246 pub fn invalid_signature(message: impl Into<String>) -> Self {
247 Self::InvalidSignature(message.into())
248 }
249}
250
251impl fmt::Display for ConnectorError {
252 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
253 match self {
254 Self::DuplicateProvider(provider) => {
255 write!(f, "connector provider `{provider}` is already registered")
256 }
257 Self::DuplicateDelivery(message) => message.fmt(f),
258 Self::UnknownProvider(provider) => {
259 write!(f, "connector provider `{provider}` is not registered")
260 }
261 Self::MissingHeader(header) => write!(f, "missing required header `{header}`"),
262 Self::InvalidHeader { name, detail } => {
263 write!(f, "invalid header `{name}`: {detail}")
264 }
265 Self::InvalidSignature(message)
266 | Self::Json(message)
267 | Self::Secret(message)
268 | Self::EventLog(message)
269 | Self::HarnRuntime(message)
270 | Self::Unsupported(message)
271 | Self::Activation(message) => message.fmt(f),
272 Self::TimestampOutOfWindow {
273 timestamp,
274 now,
275 window,
276 } => write!(
277 f,
278 "timestamp {timestamp} is outside the allowed verification window of {window} around {now}"
279 ),
280 Self::Client(error) => error.fmt(f),
281 }
282 }
283}
284
285impl std::error::Error for ConnectorError {}
286
287impl From<crate::event_log::LogError> for ConnectorError {
288 fn from(value: crate::event_log::LogError) -> Self {
289 Self::EventLog(value.to_string())
290 }
291}
292
293impl From<crate::secrets::SecretError> for ConnectorError {
294 fn from(value: crate::secrets::SecretError) -> Self {
295 Self::Secret(value.to_string())
296 }
297}
298
299impl From<serde_json::Error> for ConnectorError {
300 fn from(value: serde_json::Error) -> Self {
301 Self::Json(value.to_string())
302 }
303}
304
305impl From<ClientError> for ConnectorError {
306 fn from(value: ClientError) -> Self {
307 Self::Client(value)
308 }
309}
310
311#[derive(Clone)]
313pub struct ConnectorCtx {
314 pub event_log: Arc<AnyEventLog>,
315 pub secrets: Arc<dyn SecretProvider>,
316 pub inbox: Arc<InboxIndex>,
317 pub metrics: Arc<MetricsRegistry>,
318 pub rate_limiter: Arc<RateLimiterFactory>,
319}
320
321#[derive(Clone, Debug, Default, PartialEq, Eq)]
323pub struct ConnectorMetricsSnapshot {
324 pub inbox_claims_written: u64,
325 pub inbox_duplicates_rejected: u64,
326 pub inbox_fast_path_hits: u64,
327 pub inbox_durable_hits: u64,
328 pub inbox_expired_entries: u64,
329 pub inbox_active_entries: u64,
330 pub linear_timestamp_rejections_total: u64,
331 pub dispatch_succeeded_total: u64,
332 pub dispatch_failed_total: u64,
333 pub retry_scheduled_total: u64,
334 pub slack_delivery_success_total: u64,
335 pub slack_delivery_failure_total: u64,
336}
337
338type MetricLabels = BTreeMap<String, String>;
339
340#[derive(Clone, Debug, Default, PartialEq)]
341struct HistogramMetric {
342 buckets: BTreeMap<String, u64>,
343 count: u64,
344 sum: f64,
345}
346
347static ACTIVE_METRICS_REGISTRY: OnceLock<Mutex<Option<Arc<MetricsRegistry>>>> = OnceLock::new();
348
349pub fn install_active_metrics_registry(metrics: Arc<MetricsRegistry>) {
350 let slot = ACTIVE_METRICS_REGISTRY.get_or_init(|| Mutex::new(None));
351 *slot.lock().expect("active metrics registry poisoned") = Some(metrics);
352}
353
354pub fn clear_active_metrics_registry() {
355 if let Some(slot) = ACTIVE_METRICS_REGISTRY.get() {
356 *slot.lock().expect("active metrics registry poisoned") = None;
357 }
358}
359
360pub fn active_metrics_registry() -> Option<Arc<MetricsRegistry>> {
361 ACTIVE_METRICS_REGISTRY.get().and_then(|slot| {
362 slot.lock()
363 .expect("active metrics registry poisoned")
364 .clone()
365 })
366}
367
368#[derive(Debug, Default)]
370pub struct MetricsRegistry {
371 inbox_claims_written: AtomicU64,
372 inbox_duplicates_rejected: AtomicU64,
373 inbox_fast_path_hits: AtomicU64,
374 inbox_durable_hits: AtomicU64,
375 inbox_expired_entries: AtomicU64,
376 inbox_active_entries: AtomicU64,
377 linear_timestamp_rejections_total: AtomicU64,
378 dispatch_succeeded_total: AtomicU64,
379 dispatch_failed_total: AtomicU64,
380 retry_scheduled_total: AtomicU64,
381 slack_delivery_success_total: AtomicU64,
382 slack_delivery_failure_total: AtomicU64,
383 custom_counters: Mutex<BTreeMap<String, u64>>,
384 counters: Mutex<BTreeMap<(String, MetricLabels), f64>>,
385 gauges: Mutex<BTreeMap<(String, MetricLabels), f64>>,
386 histograms: Mutex<BTreeMap<(String, MetricLabels), HistogramMetric>>,
387 pending_trigger_events: Mutex<BTreeMap<MetricLabels, BTreeMap<String, i64>>>,
388}
389
390impl MetricsRegistry {
391 const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0];
392 const TRIGGER_LATENCY_BUCKETS: [f64; 15] = [
393 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0,
394 ];
395 const SIZE_BUCKETS: [f64; 9] = [
396 128.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0, 10485760.0,
397 ];
398
399 pub fn snapshot(&self) -> ConnectorMetricsSnapshot {
400 ConnectorMetricsSnapshot {
401 inbox_claims_written: self.inbox_claims_written.load(Ordering::Relaxed),
402 inbox_duplicates_rejected: self.inbox_duplicates_rejected.load(Ordering::Relaxed),
403 inbox_fast_path_hits: self.inbox_fast_path_hits.load(Ordering::Relaxed),
404 inbox_durable_hits: self.inbox_durable_hits.load(Ordering::Relaxed),
405 inbox_expired_entries: self.inbox_expired_entries.load(Ordering::Relaxed),
406 inbox_active_entries: self.inbox_active_entries.load(Ordering::Relaxed),
407 linear_timestamp_rejections_total: self
408 .linear_timestamp_rejections_total
409 .load(Ordering::Relaxed),
410 dispatch_succeeded_total: self.dispatch_succeeded_total.load(Ordering::Relaxed),
411 dispatch_failed_total: self.dispatch_failed_total.load(Ordering::Relaxed),
412 retry_scheduled_total: self.retry_scheduled_total.load(Ordering::Relaxed),
413 slack_delivery_success_total: self.slack_delivery_success_total.load(Ordering::Relaxed),
414 slack_delivery_failure_total: self.slack_delivery_failure_total.load(Ordering::Relaxed),
415 }
416 }
417
418 pub(crate) fn record_inbox_claim(&self) {
419 self.inbox_claims_written.fetch_add(1, Ordering::Relaxed);
420 }
421
422 pub(crate) fn record_inbox_duplicate_fast_path(&self) {
423 self.inbox_duplicates_rejected
424 .fetch_add(1, Ordering::Relaxed);
425 self.inbox_fast_path_hits.fetch_add(1, Ordering::Relaxed);
426 }
427
428 pub(crate) fn record_inbox_duplicate_durable(&self) {
429 self.inbox_duplicates_rejected
430 .fetch_add(1, Ordering::Relaxed);
431 self.inbox_durable_hits.fetch_add(1, Ordering::Relaxed);
432 }
433
434 pub(crate) fn record_inbox_expired_entries(&self, count: u64) {
435 if count > 0 {
436 self.inbox_expired_entries
437 .fetch_add(count, Ordering::Relaxed);
438 }
439 }
440
441 pub(crate) fn set_inbox_active_entries(&self, count: usize) {
442 self.inbox_active_entries
443 .store(count as u64, Ordering::Relaxed);
444 }
445
446 pub fn record_linear_timestamp_rejection(&self) {
447 self.linear_timestamp_rejections_total
448 .fetch_add(1, Ordering::Relaxed);
449 }
450
451 pub fn record_dispatch_succeeded(&self) {
452 self.dispatch_succeeded_total
453 .fetch_add(1, Ordering::Relaxed);
454 }
455
456 pub fn record_dispatch_failed(&self) {
457 self.dispatch_failed_total.fetch_add(1, Ordering::Relaxed);
458 }
459
460 pub fn record_retry_scheduled(&self) {
461 self.retry_scheduled_total.fetch_add(1, Ordering::Relaxed);
462 }
463
464 pub fn record_slack_delivery_success(&self) {
465 self.slack_delivery_success_total
466 .fetch_add(1, Ordering::Relaxed);
467 }
468
469 pub fn record_slack_delivery_failure(&self) {
470 self.slack_delivery_failure_total
471 .fetch_add(1, Ordering::Relaxed);
472 }
473
474 pub fn record_custom_counter(&self, name: &str, amount: u64) {
475 if amount == 0 {
476 return;
477 }
478 let mut counters = self
479 .custom_counters
480 .lock()
481 .expect("custom counters poisoned");
482 *counters.entry(name.to_string()).or_default() += amount;
483 }
484
485 pub fn record_http_request(
486 &self,
487 endpoint: &str,
488 method: &str,
489 status: u16,
490 duration: StdDuration,
491 body_size_bytes: usize,
492 ) {
493 self.increment_counter(
494 "harn_http_requests_total",
495 labels([
496 ("endpoint", endpoint),
497 ("method", method),
498 ("status", &status.to_string()),
499 ]),
500 1,
501 );
502 self.observe_histogram(
503 "harn_http_request_duration_seconds",
504 labels([("endpoint", endpoint)]),
505 duration.as_secs_f64(),
506 &Self::DURATION_BUCKETS,
507 );
508 self.observe_histogram(
509 "harn_http_body_size_bytes",
510 labels([("endpoint", endpoint)]),
511 body_size_bytes as f64,
512 &Self::SIZE_BUCKETS,
513 );
514 }
515
516 pub fn record_trigger_received(&self, trigger_id: &str, provider: &str) {
517 self.increment_counter(
518 "harn_trigger_received_total",
519 labels([("trigger_id", trigger_id), ("provider", provider)]),
520 1,
521 );
522 }
523
524 pub fn record_trigger_deduped(&self, trigger_id: &str, reason: &str) {
525 self.increment_counter(
526 "harn_trigger_deduped_total",
527 labels([("trigger_id", trigger_id), ("reason", reason)]),
528 1,
529 );
530 }
531
532 pub fn record_trigger_predicate_evaluation(
533 &self,
534 trigger_id: &str,
535 result: bool,
536 cost_usd: f64,
537 ) {
538 self.increment_counter(
539 "harn_trigger_predicate_evaluations_total",
540 labels([
541 ("trigger_id", trigger_id),
542 ("result", if result { "true" } else { "false" }),
543 ]),
544 1,
545 );
546 self.observe_histogram(
547 "harn_trigger_predicate_cost_usd",
548 labels([("trigger_id", trigger_id)]),
549 cost_usd.max(0.0),
550 &[0.0, 0.001, 0.01, 0.05, 0.1, 1.0],
551 );
552 }
553
554 pub fn record_trigger_dispatched(&self, trigger_id: &str, handler_kind: &str, outcome: &str) {
555 self.increment_counter(
556 "harn_trigger_dispatched_total",
557 labels([
558 ("trigger_id", trigger_id),
559 ("handler_kind", handler_kind),
560 ("outcome", outcome),
561 ]),
562 1,
563 );
564 }
565
566 pub fn record_trigger_retry(&self, trigger_id: &str, attempt: u32) {
567 self.increment_counter(
568 "harn_trigger_retries_total",
569 labels([
570 ("trigger_id", trigger_id),
571 ("attempt", &attempt.to_string()),
572 ]),
573 1,
574 );
575 }
576
577 pub fn record_trigger_dlq(&self, trigger_id: &str, reason: &str) {
578 self.increment_counter(
579 "harn_trigger_dlq_total",
580 labels([("trigger_id", trigger_id), ("reason", reason)]),
581 1,
582 );
583 }
584
585 pub fn record_trigger_accepted_to_normalized(
586 &self,
587 trigger_id: &str,
588 binding_key: &str,
589 provider: &str,
590 tenant_id: Option<&str>,
591 status: &str,
592 duration: StdDuration,
593 ) {
594 self.observe_histogram(
595 "harn_trigger_webhook_accepted_to_normalized_seconds",
596 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
597 duration.as_secs_f64(),
598 &Self::TRIGGER_LATENCY_BUCKETS,
599 );
600 }
601
602 pub fn record_trigger_accepted_to_queue_append(
603 &self,
604 trigger_id: &str,
605 binding_key: &str,
606 provider: &str,
607 tenant_id: Option<&str>,
608 status: &str,
609 duration: StdDuration,
610 ) {
611 self.observe_histogram(
612 "harn_trigger_webhook_accepted_to_queue_append_seconds",
613 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
614 duration.as_secs_f64(),
615 &Self::TRIGGER_LATENCY_BUCKETS,
616 );
617 }
618
619 pub fn record_trigger_queue_age_at_dispatch_admission(
620 &self,
621 trigger_id: &str,
622 binding_key: &str,
623 provider: &str,
624 tenant_id: Option<&str>,
625 status: &str,
626 age: StdDuration,
627 ) {
628 self.observe_histogram(
629 "harn_trigger_queue_age_at_dispatch_admission_seconds",
630 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
631 age.as_secs_f64(),
632 &Self::TRIGGER_LATENCY_BUCKETS,
633 );
634 }
635
636 pub fn record_trigger_queue_age_at_dispatch_start(
637 &self,
638 trigger_id: &str,
639 binding_key: &str,
640 provider: &str,
641 tenant_id: Option<&str>,
642 status: &str,
643 age: StdDuration,
644 ) {
645 self.observe_histogram(
646 "harn_trigger_queue_age_at_dispatch_start_seconds",
647 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
648 age.as_secs_f64(),
649 &Self::TRIGGER_LATENCY_BUCKETS,
650 );
651 }
652
653 pub fn record_trigger_dispatch_runtime(
654 &self,
655 trigger_id: &str,
656 binding_key: &str,
657 provider: &str,
658 tenant_id: Option<&str>,
659 status: &str,
660 duration: StdDuration,
661 ) {
662 self.observe_histogram(
663 "harn_trigger_dispatch_runtime_seconds",
664 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
665 duration.as_secs_f64(),
666 &Self::TRIGGER_LATENCY_BUCKETS,
667 );
668 }
669
670 pub fn record_trigger_retry_delay(
671 &self,
672 trigger_id: &str,
673 binding_key: &str,
674 provider: &str,
675 tenant_id: Option<&str>,
676 status: &str,
677 duration: StdDuration,
678 ) {
679 self.observe_histogram(
680 "harn_trigger_retry_delay_seconds",
681 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
682 duration.as_secs_f64(),
683 &Self::TRIGGER_LATENCY_BUCKETS,
684 );
685 }
686
687 pub fn record_trigger_accepted_to_dlq(
688 &self,
689 trigger_id: &str,
690 binding_key: &str,
691 provider: &str,
692 tenant_id: Option<&str>,
693 status: &str,
694 duration: StdDuration,
695 ) {
696 self.observe_histogram(
697 "harn_trigger_accepted_to_dlq_seconds",
698 trigger_lifecycle_labels(trigger_id, binding_key, provider, tenant_id, status),
699 duration.as_secs_f64(),
700 &Self::TRIGGER_LATENCY_BUCKETS,
701 );
702 }
703
704 pub fn note_trigger_pending_event(
705 &self,
706 event_id: &str,
707 trigger_id: &str,
708 binding_key: &str,
709 provider: &str,
710 tenant_id: Option<&str>,
711 accepted_at_ms: i64,
712 now_ms: i64,
713 ) {
714 let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
715 {
716 let mut pending = self
717 .pending_trigger_events
718 .lock()
719 .expect("pending trigger events poisoned");
720 pending
721 .entry(labels.clone())
722 .or_default()
723 .insert(event_id.to_string(), accepted_at_ms);
724 }
725 self.refresh_oldest_pending_gauge(labels, now_ms);
726 }
727
728 pub fn clear_trigger_pending_event(
729 &self,
730 event_id: &str,
731 trigger_id: &str,
732 binding_key: &str,
733 provider: &str,
734 tenant_id: Option<&str>,
735 now_ms: i64,
736 ) {
737 let labels = trigger_pending_labels(trigger_id, binding_key, provider, tenant_id);
738 {
739 let mut pending = self
740 .pending_trigger_events
741 .lock()
742 .expect("pending trigger events poisoned");
743 if let Some(events) = pending.get_mut(&labels) {
744 events.remove(event_id);
745 if events.is_empty() {
746 pending.remove(&labels);
747 }
748 }
749 }
750 self.refresh_oldest_pending_gauge(labels, now_ms);
751 }
752
753 pub fn set_trigger_inflight(&self, trigger_id: &str, count: u64) {
754 self.set_gauge(
755 "harn_trigger_inflight",
756 labels([("trigger_id", trigger_id)]),
757 count as f64,
758 );
759 }
760
761 pub fn set_trigger_budget_cost_today(&self, trigger_id: &str, cost_usd: f64) {
762 self.set_gauge(
763 "harn_trigger_budget_cost_today_usd",
764 labels([("trigger_id", trigger_id)]),
765 cost_usd.max(0.0),
766 );
767 }
768
769 pub fn record_trigger_budget_exhausted(&self, trigger_id: &str, strategy: &str) {
770 self.increment_counter(
771 "harn_trigger_budget_exhausted_total",
772 labels([("trigger_id", trigger_id), ("strategy", strategy)]),
773 1,
774 );
775 }
776
777 pub fn record_backpressure_event(&self, dimension: &str, action: &str) {
778 self.increment_counter(
779 "harn_backpressure_events_total",
780 labels([("dimension", dimension), ("action", action)]),
781 1,
782 );
783 }
784
785 pub fn record_event_log_append(
786 &self,
787 topic: &str,
788 duration: StdDuration,
789 payload_bytes: usize,
790 ) {
791 self.observe_histogram(
792 "harn_event_log_append_duration_seconds",
793 labels([("topic", topic)]),
794 duration.as_secs_f64(),
795 &Self::DURATION_BUCKETS,
796 );
797 self.set_gauge(
798 "harn_event_log_topic_size_bytes",
799 labels([("topic", topic)]),
800 payload_bytes as f64,
801 );
802 }
803
804 pub fn set_event_log_consumer_lag(&self, topic: &str, consumer: &str, lag: u64) {
805 self.set_gauge(
806 "harn_event_log_consumer_lag",
807 labels([("topic", topic), ("consumer", consumer)]),
808 lag as f64,
809 );
810 }
811
812 pub fn record_a2a_hop(&self, target: &str, outcome: &str, duration: StdDuration) {
813 self.increment_counter(
814 "harn_a2a_hops_total",
815 labels([("target", target), ("outcome", outcome)]),
816 1,
817 );
818 self.observe_histogram(
819 "harn_a2a_hop_duration_seconds",
820 labels([("target", target)]),
821 duration.as_secs_f64(),
822 &Self::DURATION_BUCKETS,
823 );
824 }
825
826 pub fn set_worker_queue_depth(&self, queue: &str, depth: u64) {
827 self.set_gauge(
828 "harn_worker_queue_depth",
829 labels([("queue", queue)]),
830 depth as f64,
831 );
832 }
833
834 pub fn record_worker_queue_claim_age(&self, queue: &str, age_seconds: f64) {
835 self.observe_histogram(
836 "harn_worker_queue_claim_age_seconds",
837 labels([("queue", queue)]),
838 age_seconds.max(0.0),
839 &Self::DURATION_BUCKETS,
840 );
841 }
842
843 pub fn set_orchestrator_pump_backlog(&self, topic: &str, count: u64) {
844 self.set_gauge(
845 "harn_orchestrator_pump_backlog",
846 labels([("topic", topic)]),
847 count as f64,
848 );
849 }
850
851 pub fn set_orchestrator_pump_outstanding(&self, topic: &str, count: usize) {
852 self.set_gauge(
853 "harn_orchestrator_pump_outstanding",
854 labels([("topic", topic)]),
855 count as f64,
856 );
857 }
858
859 pub fn record_orchestrator_pump_admission_delay(&self, topic: &str, duration: StdDuration) {
860 self.observe_histogram(
861 "harn_orchestrator_pump_admission_delay_seconds",
862 labels([("topic", topic)]),
863 duration.as_secs_f64(),
864 &Self::DURATION_BUCKETS,
865 );
866 }
867
868 pub fn record_llm_call(&self, provider: &str, model: &str, outcome: &str, cost_usd: f64) {
869 self.increment_counter(
870 "harn_llm_calls_total",
871 labels([
872 ("provider", provider),
873 ("model", model),
874 ("outcome", outcome),
875 ]),
876 1,
877 );
878 if cost_usd > 0.0 {
879 self.increment_counter(
880 "harn_llm_cost_usd_total",
881 labels([("provider", provider), ("model", model)]),
882 cost_usd,
883 );
884 } else {
885 self.ensure_counter(
886 "harn_llm_cost_usd_total",
887 labels([("provider", provider), ("model", model)]),
888 );
889 }
890 }
891
892 pub fn record_llm_cache_hit(&self, provider: &str) {
893 self.increment_counter(
894 "harn_llm_cache_hits_total",
895 labels([("provider", provider)]),
896 1,
897 );
898 }
899
900 pub fn render_prometheus(&self) -> String {
901 let snapshot = self.snapshot();
902 let counters = [
903 (
904 "connector_linear_timestamp_rejections_total",
905 snapshot.linear_timestamp_rejections_total,
906 ),
907 (
908 "dispatch_succeeded_total",
909 snapshot.dispatch_succeeded_total,
910 ),
911 ("dispatch_failed_total", snapshot.dispatch_failed_total),
912 ("inbox_duplicates_total", snapshot.inbox_duplicates_rejected),
913 ("retry_scheduled_total", snapshot.retry_scheduled_total),
914 (
915 "slack_events_delivery_success_total",
916 snapshot.slack_delivery_success_total,
917 ),
918 (
919 "slack_events_delivery_failure_total",
920 snapshot.slack_delivery_failure_total,
921 ),
922 ];
923
924 let mut rendered = String::new();
925 for (name, value) in counters {
926 rendered.push_str("# TYPE ");
927 rendered.push_str(name);
928 rendered.push_str(" counter\n");
929 rendered.push_str(name);
930 rendered.push(' ');
931 rendered.push_str(&value.to_string());
932 rendered.push('\n');
933 }
934 let custom_counters = self
935 .custom_counters
936 .lock()
937 .expect("custom counters poisoned");
938 for (name, value) in custom_counters.iter() {
939 let metric_name = format!(
940 "connector_custom_{}_total",
941 name.chars()
942 .map(|ch| if ch.is_ascii_alphanumeric() || ch == '_' {
943 ch
944 } else {
945 '_'
946 })
947 .collect::<String>()
948 );
949 rendered.push_str("# TYPE ");
950 rendered.push_str(&metric_name);
951 rendered.push_str(" counter\n");
952 rendered.push_str(&metric_name);
953 rendered.push(' ');
954 rendered.push_str(&value.to_string());
955 rendered.push('\n');
956 }
957 rendered.push_str("# TYPE slack_events_auto_disable_min_success_ratio gauge\n");
958 rendered.push_str("slack_events_auto_disable_min_success_ratio 0.05\n");
959 rendered.push_str("# TYPE slack_events_auto_disable_min_events_per_hour gauge\n");
960 rendered.push_str("slack_events_auto_disable_min_events_per_hour 1000\n");
961 self.render_generic_metrics(&mut rendered);
962 rendered
963 }
964
965 fn increment_counter(&self, name: &str, labels: MetricLabels, amount: impl Into<f64>) {
966 let amount = amount.into();
967 if amount <= 0.0 || !amount.is_finite() {
968 return;
969 }
970 let mut counters = self.counters.lock().expect("metrics counters poisoned");
971 *counters.entry((name.to_string(), labels)).or_default() += amount;
972 }
973
974 fn ensure_counter(&self, name: &str, labels: MetricLabels) {
975 let mut counters = self.counters.lock().expect("metrics counters poisoned");
976 counters.entry((name.to_string(), labels)).or_default();
977 }
978
979 fn set_gauge(&self, name: &str, labels: MetricLabels, value: f64) {
980 let mut gauges = self.gauges.lock().expect("metrics gauges poisoned");
981 gauges.insert((name.to_string(), labels), value);
982 }
983
984 fn observe_histogram(
985 &self,
986 name: &str,
987 labels: MetricLabels,
988 value: f64,
989 bucket_bounds: &[f64],
990 ) {
991 if !value.is_finite() {
992 return;
993 }
994 let mut histograms = self.histograms.lock().expect("metrics histograms poisoned");
995 let histogram = histograms
996 .entry((name.to_string(), labels))
997 .or_insert_with(|| HistogramMetric {
998 buckets: bucket_bounds
999 .iter()
1000 .map(|bound| (prometheus_float(*bound), 0))
1001 .chain(std::iter::once(("+Inf".to_string(), 0)))
1002 .collect(),
1003 count: 0,
1004 sum: 0.0,
1005 });
1006 histogram.count += 1;
1007 histogram.sum += value;
1008 for bound in bucket_bounds {
1009 if value <= *bound {
1010 let key = prometheus_float(*bound);
1011 *histogram.buckets.entry(key).or_default() += 1;
1012 }
1013 }
1014 *histogram.buckets.entry("+Inf".to_string()).or_default() += 1;
1015 }
1016
1017 fn refresh_oldest_pending_gauge(&self, labels: MetricLabels, now_ms: i64) {
1018 let oldest_accepted_at_ms = self
1019 .pending_trigger_events
1020 .lock()
1021 .expect("pending trigger events poisoned")
1022 .get(&labels)
1023 .and_then(|events| events.values().min().copied());
1024 let age_seconds = oldest_accepted_at_ms
1025 .map(|accepted_at_ms| millis_delta(now_ms, accepted_at_ms).as_secs_f64())
1026 .unwrap_or(0.0);
1027 self.set_gauge(
1028 "harn_trigger_oldest_pending_age_seconds",
1029 labels,
1030 age_seconds,
1031 );
1032 }
1033
1034 fn render_generic_metrics(&self, rendered: &mut String) {
1035 let counters = self
1036 .counters
1037 .lock()
1038 .expect("metrics counters poisoned")
1039 .clone();
1040 let gauges = self.gauges.lock().expect("metrics gauges poisoned").clone();
1041 let histograms = self
1042 .histograms
1043 .lock()
1044 .expect("metrics histograms poisoned")
1045 .clone();
1046
1047 for name in metric_family_names(MetricKind::Counter) {
1048 rendered.push_str("# TYPE ");
1049 rendered.push_str(name);
1050 rendered.push_str(" counter\n");
1051 for ((sample_name, labels), value) in counters.iter().filter(|((n, _), _)| n == name) {
1052 render_sample(rendered, sample_name, labels, *value);
1053 }
1054 }
1055 for name in metric_family_names(MetricKind::Gauge) {
1056 rendered.push_str("# TYPE ");
1057 rendered.push_str(name);
1058 rendered.push_str(" gauge\n");
1059 for ((sample_name, labels), value) in gauges.iter().filter(|((n, _), _)| n == name) {
1060 render_sample(rendered, sample_name, labels, *value);
1061 }
1062 }
1063 for name in metric_family_names(MetricKind::Histogram) {
1064 rendered.push_str("# TYPE ");
1065 rendered.push_str(name);
1066 rendered.push_str(" histogram\n");
1067 for ((sample_name, labels), histogram) in
1068 histograms.iter().filter(|((n, _), _)| n == name)
1069 {
1070 for (le, value) in &histogram.buckets {
1071 let mut bucket_labels = labels.clone();
1072 bucket_labels.insert("le".to_string(), le.clone());
1073 render_sample(
1074 rendered,
1075 &format!("{sample_name}_bucket"),
1076 &bucket_labels,
1077 *value as f64,
1078 );
1079 }
1080 render_sample(
1081 rendered,
1082 &format!("{sample_name}_sum"),
1083 labels,
1084 histogram.sum,
1085 );
1086 render_sample(
1087 rendered,
1088 &format!("{sample_name}_count"),
1089 labels,
1090 histogram.count as f64,
1091 );
1092 }
1093 }
1094 }
1095}
1096
1097#[derive(Clone, Copy)]
1098enum MetricKind {
1099 Counter,
1100 Gauge,
1101 Histogram,
1102}
1103
1104fn metric_family_names(kind: MetricKind) -> &'static [&'static str] {
1105 match kind {
1106 MetricKind::Counter => &[
1107 "harn_http_requests_total",
1108 "harn_trigger_received_total",
1109 "harn_trigger_deduped_total",
1110 "harn_trigger_predicate_evaluations_total",
1111 "harn_trigger_dispatched_total",
1112 "harn_trigger_retries_total",
1113 "harn_trigger_dlq_total",
1114 "harn_trigger_budget_exhausted_total",
1115 "harn_backpressure_events_total",
1116 "harn_a2a_hops_total",
1117 "harn_llm_calls_total",
1118 "harn_llm_cost_usd_total",
1119 "harn_llm_cache_hits_total",
1120 ],
1121 MetricKind::Gauge => &[
1122 "harn_trigger_inflight",
1123 "harn_event_log_topic_size_bytes",
1124 "harn_event_log_consumer_lag",
1125 "harn_trigger_budget_cost_today_usd",
1126 "harn_worker_queue_depth",
1127 "harn_orchestrator_pump_backlog",
1128 "harn_orchestrator_pump_outstanding",
1129 "harn_trigger_oldest_pending_age_seconds",
1130 ],
1131 MetricKind::Histogram => &[
1132 "harn_http_request_duration_seconds",
1133 "harn_http_body_size_bytes",
1134 "harn_trigger_predicate_cost_usd",
1135 "harn_event_log_append_duration_seconds",
1136 "harn_a2a_hop_duration_seconds",
1137 "harn_worker_queue_claim_age_seconds",
1138 "harn_orchestrator_pump_admission_delay_seconds",
1139 "harn_trigger_webhook_accepted_to_normalized_seconds",
1140 "harn_trigger_webhook_accepted_to_queue_append_seconds",
1141 "harn_trigger_queue_age_at_dispatch_admission_seconds",
1142 "harn_trigger_queue_age_at_dispatch_start_seconds",
1143 "harn_trigger_dispatch_runtime_seconds",
1144 "harn_trigger_retry_delay_seconds",
1145 "harn_trigger_accepted_to_dlq_seconds",
1146 ],
1147 }
1148}
1149
1150fn labels<const N: usize>(pairs: [(&str, &str); N]) -> MetricLabels {
1151 pairs
1152 .into_iter()
1153 .map(|(name, value)| (name.to_string(), value.to_string()))
1154 .collect()
1155}
1156
1157fn trigger_lifecycle_labels(
1158 trigger_id: &str,
1159 binding_key: &str,
1160 provider: &str,
1161 tenant_id: Option<&str>,
1162 status: &str,
1163) -> MetricLabels {
1164 labels([
1165 ("binding_key", binding_key),
1166 ("provider", provider),
1167 ("status", status),
1168 ("tenant_id", tenant_label(tenant_id)),
1169 ("trigger_id", trigger_id),
1170 ])
1171}
1172
1173fn trigger_pending_labels(
1174 trigger_id: &str,
1175 binding_key: &str,
1176 provider: &str,
1177 tenant_id: Option<&str>,
1178) -> MetricLabels {
1179 labels([
1180 ("binding_key", binding_key),
1181 ("provider", provider),
1182 ("tenant_id", tenant_label(tenant_id)),
1183 ("trigger_id", trigger_id),
1184 ])
1185}
1186
1187fn tenant_label(tenant_id: Option<&str>) -> &str {
1188 tenant_id
1189 .map(str::trim)
1190 .filter(|value| !value.is_empty())
1191 .unwrap_or("none")
1192}
1193
1194fn millis_delta(later_ms: i64, earlier_ms: i64) -> StdDuration {
1195 StdDuration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
1196}
1197
1198fn render_sample(rendered: &mut String, name: &str, labels: &MetricLabels, value: f64) {
1199 rendered.push_str(name);
1200 if !labels.is_empty() {
1201 rendered.push('{');
1202 for (index, (label, label_value)) in labels.iter().enumerate() {
1203 if index > 0 {
1204 rendered.push(',');
1205 }
1206 rendered.push_str(label);
1207 rendered.push_str("=\"");
1208 rendered.push_str(&escape_label_value(label_value));
1209 rendered.push('"');
1210 }
1211 rendered.push('}');
1212 }
1213 rendered.push(' ');
1214 rendered.push_str(&prometheus_float(value));
1215 rendered.push('\n');
1216}
1217
1218fn escape_label_value(value: &str) -> String {
1219 value
1220 .chars()
1221 .flat_map(|ch| match ch {
1222 '\\' => "\\\\".chars().collect::<Vec<_>>(),
1223 '"' => "\\\"".chars().collect::<Vec<_>>(),
1224 '\n' => "\\n".chars().collect::<Vec<_>>(),
1225 other => vec![other],
1226 })
1227 .collect()
1228}
1229
1230fn prometheus_float(value: f64) -> String {
1231 if value.is_infinite() && value.is_sign_positive() {
1232 return "+Inf".to_string();
1233 }
1234 if value.fract() == 0.0 {
1235 format!("{value:.0}")
1236 } else {
1237 let rendered = format!("{value:.6}");
1238 rendered
1239 .trim_end_matches('0')
1240 .trim_end_matches('.')
1241 .to_string()
1242 }
1243}
1244
1245#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1247pub struct ProviderPayloadSchema {
1248 pub harn_schema_name: String,
1249 #[serde(default)]
1250 pub json_schema: JsonValue,
1251}
1252
1253impl ProviderPayloadSchema {
1254 pub fn new(harn_schema_name: impl Into<String>, json_schema: JsonValue) -> Self {
1255 Self {
1256 harn_schema_name: harn_schema_name.into(),
1257 json_schema,
1258 }
1259 }
1260
1261 pub fn named(harn_schema_name: impl Into<String>) -> Self {
1262 Self::new(harn_schema_name, JsonValue::Null)
1263 }
1264}
1265
1266impl Default for ProviderPayloadSchema {
1267 fn default() -> Self {
1268 Self::named("raw")
1269 }
1270}
1271
1272#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1274#[serde(transparent)]
1275pub struct TriggerKind(String);
1276
1277impl TriggerKind {
1278 pub fn new(value: impl Into<String>) -> Self {
1279 Self(value.into())
1280 }
1281
1282 pub fn as_str(&self) -> &str {
1283 self.0.as_str()
1284 }
1285}
1286
1287impl From<&str> for TriggerKind {
1288 fn from(value: &str) -> Self {
1289 Self::new(value)
1290 }
1291}
1292
1293impl From<String> for TriggerKind {
1294 fn from(value: String) -> Self {
1295 Self::new(value)
1296 }
1297}
1298
1299#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1301pub struct TriggerBinding {
1302 pub provider: ProviderId,
1303 pub kind: TriggerKind,
1304 pub binding_id: String,
1305 #[serde(default)]
1306 pub dedupe_key: Option<String>,
1307 #[serde(default = "default_dedupe_retention_days")]
1308 pub dedupe_retention_days: u32,
1309 #[serde(default)]
1310 pub config: JsonValue,
1311}
1312
1313impl TriggerBinding {
1314 pub fn new(
1315 provider: ProviderId,
1316 kind: impl Into<TriggerKind>,
1317 binding_id: impl Into<String>,
1318 ) -> Self {
1319 Self {
1320 provider,
1321 kind: kind.into(),
1322 binding_id: binding_id.into(),
1323 dedupe_key: None,
1324 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1325 config: JsonValue::Null,
1326 }
1327 }
1328}
1329
1330fn default_dedupe_retention_days() -> u32 {
1331 crate::triggers::DEFAULT_INBOX_RETENTION_DAYS
1332}
1333
1334#[derive(Clone, Debug, Default)]
1336pub struct TriggerRegistry {
1337 bindings: BTreeMap<ProviderId, Vec<TriggerBinding>>,
1338}
1339
1340impl TriggerRegistry {
1341 pub fn register(&mut self, binding: TriggerBinding) {
1342 self.bindings
1343 .entry(binding.provider.clone())
1344 .or_default()
1345 .push(binding);
1346 }
1347
1348 pub fn bindings(&self) -> &BTreeMap<ProviderId, Vec<TriggerBinding>> {
1349 &self.bindings
1350 }
1351
1352 pub fn bindings_for(&self, provider: &ProviderId) -> &[TriggerBinding] {
1353 self.bindings
1354 .get(provider)
1355 .map(Vec::as_slice)
1356 .unwrap_or(&[])
1357 }
1358}
1359
1360#[derive(Clone, Debug, PartialEq, Eq)]
1362pub struct ActivationHandle {
1363 pub provider: ProviderId,
1364 pub binding_count: usize,
1365}
1366
1367impl ActivationHandle {
1368 pub fn new(provider: ProviderId, binding_count: usize) -> Self {
1369 Self {
1370 provider,
1371 binding_count,
1372 }
1373 }
1374}
1375
1376#[derive(Clone, Debug, PartialEq)]
1378pub struct RawInbound {
1379 pub kind: String,
1380 pub headers: BTreeMap<String, String>,
1381 pub query: BTreeMap<String, String>,
1382 pub body: Vec<u8>,
1383 pub received_at: OffsetDateTime,
1384 pub occurred_at: Option<OffsetDateTime>,
1385 pub tenant_id: Option<TenantId>,
1386 pub metadata: JsonValue,
1387}
1388
1389impl RawInbound {
1390 pub fn new(kind: impl Into<String>, headers: BTreeMap<String, String>, body: Vec<u8>) -> Self {
1391 Self {
1392 kind: kind.into(),
1393 headers,
1394 query: BTreeMap::new(),
1395 body,
1396 received_at: clock::now_utc(),
1397 occurred_at: None,
1398 tenant_id: None,
1399 metadata: JsonValue::Null,
1400 }
1401 }
1402
1403 pub fn json_body(&self) -> Result<JsonValue, ConnectorError> {
1404 Ok(serde_json::from_slice(&self.body)?)
1405 }
1406}
1407
1408#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1410pub struct RateLimitConfig {
1411 pub capacity: u32,
1412 pub refill_tokens: u32,
1413 pub refill_interval: StdDuration,
1414}
1415
1416impl Default for RateLimitConfig {
1417 fn default() -> Self {
1418 Self {
1419 capacity: 60,
1420 refill_tokens: 1,
1421 refill_interval: StdDuration::from_secs(1),
1422 }
1423 }
1424}
1425
1426#[derive(Clone, Debug)]
1427struct TokenBucket {
1428 tokens: f64,
1429 last_refill: ClockInstant,
1430}
1431
1432impl TokenBucket {
1433 fn full(config: RateLimitConfig) -> Self {
1434 Self {
1435 tokens: config.capacity as f64,
1436 last_refill: clock::instant_now(),
1437 }
1438 }
1439
1440 fn refill(&mut self, config: RateLimitConfig, now: ClockInstant) {
1441 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1442 let rate = config.refill_tokens.max(1) as f64 / interval;
1443 let elapsed = now.duration_since(self.last_refill).as_secs_f64();
1444 self.tokens = (self.tokens + elapsed * rate).min(config.capacity.max(1) as f64);
1445 self.last_refill = now;
1446 }
1447
1448 fn try_acquire(&mut self, config: RateLimitConfig, now: ClockInstant) -> bool {
1449 self.refill(config, now);
1450 if self.tokens >= 1.0 {
1451 self.tokens -= 1.0;
1452 true
1453 } else {
1454 false
1455 }
1456 }
1457
1458 fn wait_duration(&self, config: RateLimitConfig) -> StdDuration {
1459 if self.tokens >= 1.0 {
1460 return StdDuration::ZERO;
1461 }
1462 let interval = config.refill_interval.as_secs_f64().max(f64::EPSILON);
1463 let rate = config.refill_tokens.max(1) as f64 / interval;
1464 let missing = (1.0 - self.tokens).max(0.0);
1465 StdDuration::from_secs_f64((missing / rate).max(0.001))
1466 }
1467}
1468
1469#[derive(Debug)]
1471pub struct RateLimiterFactory {
1472 config: RateLimitConfig,
1473 buckets: Mutex<HashMap<(String, String), TokenBucket>>,
1474}
1475
1476impl RateLimiterFactory {
1477 pub fn new(config: RateLimitConfig) -> Self {
1478 Self {
1479 config,
1480 buckets: Mutex::new(HashMap::new()),
1481 }
1482 }
1483
1484 pub fn config(&self) -> RateLimitConfig {
1485 self.config
1486 }
1487
1488 pub fn scoped(&self, provider: &ProviderId, key: impl Into<String>) -> ScopedRateLimiter<'_> {
1489 ScopedRateLimiter {
1490 factory: self,
1491 provider: provider.clone(),
1492 key: key.into(),
1493 }
1494 }
1495
1496 pub fn try_acquire(&self, provider: &ProviderId, key: &str) -> bool {
1497 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1498 let bucket = buckets
1499 .entry((provider.as_str().to_string(), key.to_string()))
1500 .or_insert_with(|| TokenBucket::full(self.config));
1501 bucket.try_acquire(self.config, clock::instant_now())
1502 }
1503
1504 pub async fn acquire(&self, provider: &ProviderId, key: &str) {
1505 loop {
1506 let wait = {
1507 let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned");
1508 let bucket = buckets
1509 .entry((provider.as_str().to_string(), key.to_string()))
1510 .or_insert_with(|| TokenBucket::full(self.config));
1511 if bucket.try_acquire(self.config, clock::instant_now()) {
1512 return;
1513 }
1514 bucket.wait_duration(self.config)
1515 };
1516 tokio::time::sleep(wait).await;
1517 }
1518 }
1519}
1520
1521impl Default for RateLimiterFactory {
1522 fn default() -> Self {
1523 Self::new(RateLimitConfig::default())
1524 }
1525}
1526
1527#[derive(Clone, Debug)]
1529pub struct ScopedRateLimiter<'a> {
1530 factory: &'a RateLimiterFactory,
1531 provider: ProviderId,
1532 key: String,
1533}
1534
1535impl<'a> ScopedRateLimiter<'a> {
1536 pub fn try_acquire(&self) -> bool {
1537 self.factory.try_acquire(&self.provider, &self.key)
1538 }
1539
1540 pub async fn acquire(&self) {
1541 self.factory.acquire(&self.provider, &self.key).await;
1542 }
1543}
1544
1545pub struct ConnectorRegistry {
1547 connectors: BTreeMap<ProviderId, ConnectorHandle>,
1548}
1549
1550impl ConnectorRegistry {
1551 pub fn empty() -> Self {
1552 Self {
1553 connectors: BTreeMap::new(),
1554 }
1555 }
1556
1557 pub fn with_defaults() -> Self {
1558 let mut registry = Self::empty();
1559 for provider in registered_provider_metadata() {
1560 registry
1561 .register(default_connector_for_provider(&provider))
1562 .expect("default connector registration should not fail");
1563 }
1564 registry
1565 }
1566
1567 pub fn register(&mut self, connector: Box<dyn Connector>) -> Result<(), ConnectorError> {
1568 let provider = connector.provider_id().clone();
1569 if self.connectors.contains_key(&provider) {
1570 return Err(ConnectorError::DuplicateProvider(provider.0));
1571 }
1572 self.connectors
1573 .insert(provider, Arc::new(AsyncMutex::new(connector)));
1574 Ok(())
1575 }
1576
1577 pub fn get(&self, id: &ProviderId) -> Option<ConnectorHandle> {
1578 self.connectors.get(id).cloned()
1579 }
1580
1581 pub fn remove(&mut self, id: &ProviderId) -> Option<ConnectorHandle> {
1582 self.connectors.remove(id)
1583 }
1584
1585 pub fn list(&self) -> Vec<ProviderId> {
1586 self.connectors.keys().cloned().collect()
1587 }
1588
1589 pub async fn init_all(&self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1590 for connector in self.connectors.values() {
1591 connector.lock().await.init(ctx.clone()).await?;
1592 }
1593 Ok(())
1594 }
1595
1596 pub async fn client_map(&self) -> BTreeMap<ProviderId, Arc<dyn ConnectorClient>> {
1597 let mut clients = BTreeMap::new();
1598 for (provider, connector) in &self.connectors {
1599 let client = connector.lock().await.client();
1600 clients.insert(provider.clone(), client);
1601 }
1602 clients
1603 }
1604
1605 pub async fn activate_all(
1606 &self,
1607 registry: &TriggerRegistry,
1608 ) -> Result<Vec<ActivationHandle>, ConnectorError> {
1609 let mut handles = Vec::new();
1610 for (provider, connector) in &self.connectors {
1611 let bindings = registry.bindings_for(provider);
1612 if bindings.is_empty() {
1613 continue;
1614 }
1615 let connector = connector.lock().await;
1616 handles.push(connector.activate(bindings).await?);
1617 }
1618 Ok(handles)
1619 }
1620}
1621
1622impl Default for ConnectorRegistry {
1623 fn default() -> Self {
1624 Self::with_defaults()
1625 }
1626}
1627
1628fn default_connector_for_provider(provider: &ProviderMetadata) -> Box<dyn Connector> {
1629 if provider.provider == "github" {
1639 return Box::new(GitHubConnector::new());
1640 }
1641 if provider.provider == "linear" {
1642 return Box::new(LinearConnector::new());
1643 }
1644 if provider.provider == "slack" {
1645 return Box::new(SlackConnector::new());
1646 }
1647 if provider.provider == "notion" {
1648 return Box::new(NotionConnector::new());
1649 }
1650 if provider.provider == "a2a-push" {
1651 return Box::new(A2aPushConnector::new());
1652 }
1653 match &provider.runtime {
1654 ProviderRuntimeMetadata::Builtin {
1655 connector,
1656 default_signature_variant,
1657 } => match connector.as_str() {
1658 "cron" => Box::new(CronConnector::new()),
1659 "stream" => Box::new(StreamConnector::new(
1660 ProviderId::from(provider.provider.clone()),
1661 provider.schema_name.clone(),
1662 )),
1663 "webhook" => {
1664 let variant = WebhookSignatureVariant::parse(default_signature_variant.as_deref())
1665 .expect("catalog webhook signature variant must be valid");
1666 Box::new(GenericWebhookConnector::with_profile(
1667 WebhookProviderProfile::new(
1668 ProviderId::from(provider.provider.clone()),
1669 provider.schema_name.clone(),
1670 variant,
1671 ),
1672 ))
1673 }
1674 _ => Box::new(PlaceholderConnector::from_metadata(provider)),
1675 },
1676 ProviderRuntimeMetadata::Placeholder => {
1677 Box::new(PlaceholderConnector::from_metadata(provider))
1678 }
1679 }
1680}
1681
1682struct PlaceholderConnector {
1683 provider_id: ProviderId,
1684 kinds: Vec<TriggerKind>,
1685 schema_name: String,
1686}
1687
1688impl PlaceholderConnector {
1689 fn from_metadata(metadata: &ProviderMetadata) -> Self {
1690 Self {
1691 provider_id: ProviderId::from(metadata.provider.clone()),
1692 kinds: metadata
1693 .kinds
1694 .iter()
1695 .cloned()
1696 .map(TriggerKind::from)
1697 .collect(),
1698 schema_name: metadata.schema_name.clone(),
1699 }
1700 }
1701}
1702
1703struct PlaceholderClient;
1704
1705#[async_trait]
1706impl ConnectorClient for PlaceholderClient {
1707 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1708 Err(ClientError::Other(format!(
1709 "connector client method '{method}' is not implemented for this provider"
1710 )))
1711 }
1712}
1713
1714#[async_trait]
1715impl Connector for PlaceholderConnector {
1716 fn provider_id(&self) -> &ProviderId {
1717 &self.provider_id
1718 }
1719
1720 fn kinds(&self) -> &[TriggerKind] {
1721 &self.kinds
1722 }
1723
1724 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1725 Ok(())
1726 }
1727
1728 async fn activate(
1729 &self,
1730 bindings: &[TriggerBinding],
1731 ) -> Result<ActivationHandle, ConnectorError> {
1732 Ok(ActivationHandle::new(
1733 self.provider_id.clone(),
1734 bindings.len(),
1735 ))
1736 }
1737
1738 async fn normalize_inbound(&self, _raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
1739 Err(ConnectorError::Unsupported(format!(
1740 "provider '{}' is cataloged but does not have a concrete inbound connector yet",
1741 self.provider_id.as_str()
1742 )))
1743 }
1744
1745 fn payload_schema(&self) -> ProviderPayloadSchema {
1746 ProviderPayloadSchema::named(self.schema_name.clone())
1747 }
1748
1749 fn client(&self) -> Arc<dyn ConnectorClient> {
1750 Arc::new(PlaceholderClient)
1751 }
1752}
1753
1754pub fn install_active_connector_clients(clients: BTreeMap<ProviderId, Arc<dyn ConnectorClient>>) {
1755 ACTIVE_CONNECTOR_CLIENTS.with(|slot| {
1756 *slot.borrow_mut() = clients
1757 .into_iter()
1758 .map(|(provider, client)| (provider.as_str().to_string(), client))
1759 .collect();
1760 });
1761}
1762
1763pub fn active_connector_client(provider: &str) -> Option<Arc<dyn ConnectorClient>> {
1764 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow().get(provider).cloned())
1765}
1766
1767pub fn clear_active_connector_clients() {
1768 ACTIVE_CONNECTOR_CLIENTS.with(|slot| slot.borrow_mut().clear());
1769}
1770
1771#[cfg(test)]
1772mod tests {
1773 use super::*;
1774
1775 use std::sync::atomic::{AtomicUsize, Ordering};
1776
1777 use async_trait::async_trait;
1778 use serde_json::json;
1779
1780 struct NoopClient;
1781
1782 #[async_trait]
1783 impl ConnectorClient for NoopClient {
1784 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
1785 Ok(json!({ "method": method }))
1786 }
1787 }
1788
1789 struct FakeConnector {
1790 provider_id: ProviderId,
1791 kinds: Vec<TriggerKind>,
1792 activate_calls: Arc<AtomicUsize>,
1793 }
1794
1795 impl FakeConnector {
1796 fn new(provider_id: &str, activate_calls: Arc<AtomicUsize>) -> Self {
1797 Self {
1798 provider_id: ProviderId::from(provider_id),
1799 kinds: vec![TriggerKind::from("webhook")],
1800 activate_calls,
1801 }
1802 }
1803 }
1804
1805 #[async_trait]
1806 impl Connector for FakeConnector {
1807 fn provider_id(&self) -> &ProviderId {
1808 &self.provider_id
1809 }
1810
1811 fn kinds(&self) -> &[TriggerKind] {
1812 &self.kinds
1813 }
1814
1815 async fn init(&mut self, _ctx: ConnectorCtx) -> Result<(), ConnectorError> {
1816 Ok(())
1817 }
1818
1819 async fn activate(
1820 &self,
1821 bindings: &[TriggerBinding],
1822 ) -> Result<ActivationHandle, ConnectorError> {
1823 self.activate_calls.fetch_add(1, Ordering::SeqCst);
1824 Ok(ActivationHandle::new(
1825 self.provider_id.clone(),
1826 bindings.len(),
1827 ))
1828 }
1829
1830 async fn normalize_inbound(
1831 &self,
1832 _raw: RawInbound,
1833 ) -> Result<TriggerEvent, ConnectorError> {
1834 Err(ConnectorError::Unsupported(
1835 "not needed for registry tests".to_string(),
1836 ))
1837 }
1838
1839 fn payload_schema(&self) -> ProviderPayloadSchema {
1840 ProviderPayloadSchema::named("FakePayload")
1841 }
1842
1843 fn client(&self) -> Arc<dyn ConnectorClient> {
1844 Arc::new(NoopClient)
1845 }
1846 }
1847
1848 #[tokio::test]
1849 async fn connector_registry_rejects_duplicate_providers() {
1850 let activate_calls = Arc::new(AtomicUsize::new(0));
1851 let mut registry = ConnectorRegistry::empty();
1852 registry
1853 .register(Box::new(FakeConnector::new(
1854 "github",
1855 activate_calls.clone(),
1856 )))
1857 .unwrap();
1858
1859 let error = registry
1860 .register(Box::new(FakeConnector::new("github", activate_calls)))
1861 .unwrap_err();
1862 assert!(matches!(
1863 error,
1864 ConnectorError::DuplicateProvider(provider) if provider == "github"
1865 ));
1866 }
1867
1868 #[tokio::test]
1869 async fn connector_registry_activates_only_bound_connectors() {
1870 let github_calls = Arc::new(AtomicUsize::new(0));
1871 let slack_calls = Arc::new(AtomicUsize::new(0));
1872 let mut registry = ConnectorRegistry::empty();
1873 registry
1874 .register(Box::new(FakeConnector::new("github", github_calls.clone())))
1875 .unwrap();
1876 registry
1877 .register(Box::new(FakeConnector::new("slack", slack_calls.clone())))
1878 .unwrap();
1879
1880 let mut trigger_registry = TriggerRegistry::default();
1881 trigger_registry.register(TriggerBinding::new(
1882 ProviderId::from("github"),
1883 "webhook",
1884 "github.push",
1885 ));
1886 trigger_registry.register(TriggerBinding::new(
1887 ProviderId::from("github"),
1888 "webhook",
1889 "github.installation",
1890 ));
1891
1892 let handles = registry.activate_all(&trigger_registry).await.unwrap();
1893 assert_eq!(handles.len(), 1);
1894 assert_eq!(handles[0].provider.as_str(), "github");
1895 assert_eq!(handles[0].binding_count, 2);
1896 assert_eq!(github_calls.load(Ordering::SeqCst), 1);
1897 assert_eq!(slack_calls.load(Ordering::SeqCst), 0);
1898 }
1899
1900 #[test]
1901 fn rate_limiter_scopes_tokens_by_provider_and_key() {
1902 let factory = RateLimiterFactory::new(RateLimitConfig {
1903 capacity: 1,
1904 refill_tokens: 1,
1905 refill_interval: StdDuration::from_secs(60),
1906 });
1907
1908 assert!(factory.try_acquire(&ProviderId::from("github"), "org:1"));
1909 assert!(!factory.try_acquire(&ProviderId::from("github"), "org:1"));
1910 assert!(factory.try_acquire(&ProviderId::from("github"), "org:2"));
1911 assert!(factory.try_acquire(&ProviderId::from("slack"), "org:1"));
1912 }
1913
1914 #[test]
1915 fn raw_inbound_json_body_preserves_raw_bytes() {
1916 let raw = RawInbound::new(
1917 "push",
1918 BTreeMap::from([("Content-Type".to_string(), "application/json".to_string())]),
1919 br#"{"ok":true}"#.to_vec(),
1920 );
1921
1922 assert_eq!(raw.json_body().unwrap(), json!({ "ok": true }));
1923 }
1924
1925 #[test]
1926 fn connector_registry_lists_catalog_providers() {
1927 let registry = ConnectorRegistry::default();
1928 let providers = registry.list();
1929 assert!(providers.contains(&ProviderId::from("cron")));
1930 assert!(providers.contains(&ProviderId::from("github")));
1931 assert!(providers.contains(&ProviderId::from("webhook")));
1932 }
1933
1934 #[test]
1935 fn metrics_registry_exports_orchestrator_metric_families() {
1936 let metrics = MetricsRegistry::default();
1937 metrics.record_http_request(
1938 "/triggers/github",
1939 "POST",
1940 200,
1941 StdDuration::from_millis(25),
1942 512,
1943 );
1944 metrics.record_trigger_received("github-new-issue", "github");
1945 metrics.record_trigger_deduped("github-new-issue", "inbox_duplicate");
1946 metrics.record_trigger_predicate_evaluation("github-new-issue", true, 0.002);
1947 metrics.record_trigger_dispatched("github-new-issue", "local", "succeeded");
1948 metrics.record_trigger_retry("github-new-issue", 2);
1949 metrics.record_trigger_dlq("github-new-issue", "retry_exhausted");
1950 metrics.set_trigger_inflight("github-new-issue", 0);
1951 metrics.record_event_log_append(
1952 "orchestrator.triggers.pending",
1953 StdDuration::from_millis(1),
1954 2048,
1955 );
1956 metrics.set_event_log_consumer_lag("orchestrator.triggers.pending", "orchestrator-pump", 0);
1957 metrics.set_trigger_budget_cost_today("github-new-issue", 0.002);
1958 metrics.record_trigger_budget_exhausted("github-new-issue", "daily_budget_exceeded");
1959 metrics.record_a2a_hop("agent.example", "succeeded", StdDuration::from_millis(10));
1960 metrics.set_worker_queue_depth("triage", 1);
1961 metrics.record_worker_queue_claim_age("triage", 3.0);
1962 metrics.set_orchestrator_pump_backlog("trigger.inbox.envelopes", 2);
1963 metrics.set_orchestrator_pump_outstanding("trigger.inbox.envelopes", 1);
1964 metrics.record_orchestrator_pump_admission_delay(
1965 "trigger.inbox.envelopes",
1966 StdDuration::from_millis(50),
1967 );
1968 metrics.record_trigger_accepted_to_normalized(
1969 "github-new-issue",
1970 "github-new-issue@v7",
1971 "github",
1972 Some("tenant-a"),
1973 "normalized",
1974 StdDuration::from_millis(25),
1975 );
1976 metrics.record_trigger_accepted_to_queue_append(
1977 "github-new-issue",
1978 "github-new-issue@v7",
1979 "github",
1980 Some("tenant-a"),
1981 "queued",
1982 StdDuration::from_millis(40),
1983 );
1984 metrics.record_trigger_queue_age_at_dispatch_admission(
1985 "github-new-issue",
1986 "github-new-issue@v7",
1987 "github",
1988 Some("tenant-a"),
1989 "admitted",
1990 StdDuration::from_millis(75),
1991 );
1992 metrics.record_trigger_queue_age_at_dispatch_start(
1993 "github-new-issue",
1994 "github-new-issue@v7",
1995 "github",
1996 Some("tenant-a"),
1997 "started",
1998 StdDuration::from_millis(125),
1999 );
2000 metrics.record_trigger_dispatch_runtime(
2001 "github-new-issue",
2002 "github-new-issue@v7",
2003 "github",
2004 Some("tenant-a"),
2005 "succeeded",
2006 StdDuration::from_millis(250),
2007 );
2008 metrics.record_trigger_retry_delay(
2009 "github-new-issue",
2010 "github-new-issue@v7",
2011 "github",
2012 Some("tenant-a"),
2013 "scheduled",
2014 StdDuration::from_secs(2),
2015 );
2016 metrics.record_trigger_accepted_to_dlq(
2017 "github-new-issue",
2018 "github-new-issue@v7",
2019 "github",
2020 Some("tenant-a"),
2021 "retry_exhausted",
2022 StdDuration::from_secs(45),
2023 );
2024 metrics.record_backpressure_event("ingest", "reject");
2025 metrics.note_trigger_pending_event(
2026 "evt-1",
2027 "github-new-issue",
2028 "github-new-issue@v7",
2029 "github",
2030 Some("tenant-a"),
2031 1_000,
2032 4_000,
2033 );
2034 metrics.record_llm_call("mock", "mock", "succeeded", 0.01);
2035 metrics.record_llm_cache_hit("mock");
2036
2037 let rendered = metrics.render_prometheus();
2038 for needle in [
2039 "harn_http_requests_total{endpoint=\"/triggers/github\",method=\"POST\",status=\"200\"} 1",
2040 "harn_http_request_duration_seconds_bucket{endpoint=\"/triggers/github\",le=\"0.05\"} 1",
2041 "harn_http_body_size_bytes_bucket{endpoint=\"/triggers/github\",le=\"512\"} 1",
2042 "harn_trigger_received_total{provider=\"github\",trigger_id=\"github-new-issue\"} 1",
2043 "harn_trigger_deduped_total{reason=\"inbox_duplicate\",trigger_id=\"github-new-issue\"} 1",
2044 "harn_trigger_predicate_evaluations_total{result=\"true\",trigger_id=\"github-new-issue\"} 1",
2045 "harn_trigger_predicate_cost_usd_bucket{le=\"0.01\",trigger_id=\"github-new-issue\"} 1",
2046 "harn_trigger_dispatched_total{handler_kind=\"local\",outcome=\"succeeded\",trigger_id=\"github-new-issue\"} 1",
2047 "harn_trigger_retries_total{attempt=\"2\",trigger_id=\"github-new-issue\"} 1",
2048 "harn_trigger_dlq_total{reason=\"retry_exhausted\",trigger_id=\"github-new-issue\"} 1",
2049 "harn_trigger_inflight{trigger_id=\"github-new-issue\"} 0",
2050 "harn_event_log_append_duration_seconds_bucket{le=\"0.005\",topic=\"orchestrator.triggers.pending\"} 1",
2051 "harn_event_log_topic_size_bytes{topic=\"orchestrator.triggers.pending\"} 2048",
2052 "harn_event_log_consumer_lag{consumer=\"orchestrator-pump\",topic=\"orchestrator.triggers.pending\"} 0",
2053 "harn_trigger_budget_cost_today_usd{trigger_id=\"github-new-issue\"} 0.002",
2054 "harn_trigger_budget_exhausted_total{strategy=\"daily_budget_exceeded\",trigger_id=\"github-new-issue\"} 1",
2055 "harn_backpressure_events_total{action=\"reject\",dimension=\"ingest\"} 1",
2056 "harn_a2a_hops_total{outcome=\"succeeded\",target=\"agent.example\"} 1",
2057 "harn_a2a_hop_duration_seconds_bucket{le=\"0.01\",target=\"agent.example\"} 1",
2058 "harn_worker_queue_depth{queue=\"triage\"} 1",
2059 "harn_worker_queue_claim_age_seconds_bucket{le=\"5\",queue=\"triage\"} 1",
2060 "harn_orchestrator_pump_backlog{topic=\"trigger.inbox.envelopes\"} 2",
2061 "harn_orchestrator_pump_outstanding{topic=\"trigger.inbox.envelopes\"} 1",
2062 "harn_orchestrator_pump_admission_delay_seconds_bucket{le=\"0.05\",topic=\"trigger.inbox.envelopes\"} 1",
2063 "harn_trigger_webhook_accepted_to_normalized_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.025\",provider=\"github\",status=\"normalized\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2064 "harn_trigger_webhook_accepted_to_queue_append_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.05\",provider=\"github\",status=\"queued\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2065 "harn_trigger_queue_age_at_dispatch_admission_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.1\",provider=\"github\",status=\"admitted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2066 "harn_trigger_queue_age_at_dispatch_start_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"started\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2067 "harn_trigger_dispatch_runtime_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"0.25\",provider=\"github\",status=\"succeeded\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2068 "harn_trigger_retry_delay_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"2.5\",provider=\"github\",status=\"scheduled\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2069 "harn_trigger_accepted_to_dlq_seconds_bucket{binding_key=\"github-new-issue@v7\",le=\"60\",provider=\"github\",status=\"retry_exhausted\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 1",
2070 "harn_trigger_oldest_pending_age_seconds{binding_key=\"github-new-issue@v7\",provider=\"github\",tenant_id=\"tenant-a\",trigger_id=\"github-new-issue\"} 3",
2071 "harn_llm_calls_total{model=\"mock\",outcome=\"succeeded\",provider=\"mock\"} 1",
2072 "harn_llm_cost_usd_total{model=\"mock\",provider=\"mock\"} 0.01",
2073 "harn_llm_cache_hits_total{provider=\"mock\"} 1",
2074 ] {
2075 assert!(rendered.contains(needle), "missing {needle}\n{rendered}");
2076 }
2077 }
2078}