1use std::collections::BTreeMap;
2use std::fs;
3use std::io;
4use std::path::{Path, PathBuf};
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, Mutex};
7use std::time::Duration as StdDuration;
8
9use async_trait::async_trait;
10use serde_json::{json, Value as JsonValue};
11use time::OffsetDateTime;
12use tokio::sync::mpsc;
13use uuid::Uuid;
14
15pub use crate::http::framing::{
16 http_content_length_from_header_lines, HttpContentLengthLimitError, TEST_HTTP_MAX_BODY_BYTES,
17};
18pub use crate::http::{HttpMockCallSnapshot, HttpMockResponse};
19pub use crate::triggers::test_util::clock::{
20 active_mock_clock, install_override as install_clock_override, instant_now, now_ms, now_utc,
21 ClockInstant, ClockOverrideGuard, MockClock,
22};
23
24use crate::connectors::{
25 ConnectorCtx, MetricsRegistry, RateLimiterFactory, RawInbound, TriggerBinding,
26};
27use crate::event_log::{AnyEventLog, MemoryEventLog};
28use crate::secrets::{
29 RotationHandle, SecretBytes, SecretError, SecretId, SecretMeta, SecretProvider, SecretVersion,
30};
31use crate::triggers::{InboxIndex, ProviderId, TenantId};
32
33#[derive(Clone, Debug)]
34pub struct MemorySecretProvider {
35 provider: String,
36 inner: Arc<Mutex<BTreeMap<(String, String), VersionedSecret>>>,
37}
38
39#[derive(Clone, Debug, Default)]
40struct VersionedSecret {
41 latest: Option<u64>,
42 versions: BTreeMap<u64, Vec<u8>>,
43}
44
45impl MemorySecretProvider {
46 pub fn new(provider: impl Into<String>) -> Self {
47 Self {
48 provider: provider.into(),
49 inner: Arc::new(Mutex::new(BTreeMap::new())),
50 }
51 }
52
53 pub fn empty() -> Self {
54 Self::new("connector-testkit")
55 }
56
57 pub fn with_secret(mut self, id: SecretId, value: impl AsRef<[u8]>) -> Self {
58 self.insert(id, value);
59 self
60 }
61
62 pub fn with_scoped_secret(
63 self,
64 namespace: impl Into<String>,
65 tenant_id: impl AsRef<str>,
66 binding_id: impl AsRef<str>,
67 name: impl AsRef<str>,
68 value: impl AsRef<[u8]>,
69 ) -> Self {
70 let id = scoped_secret_id(namespace, tenant_id, binding_id, name);
71 self.with_secret(id, value)
72 }
73
74 pub fn insert(&mut self, id: SecretId, value: impl AsRef<[u8]>) {
75 let mut inner = self.inner.lock().expect("memory secret provider poisoned");
76 insert_secret(&mut inner, id, value.as_ref().to_vec());
77 }
78
79 pub fn insert_scoped(
80 &mut self,
81 namespace: impl Into<String>,
82 tenant_id: impl AsRef<str>,
83 binding_id: impl AsRef<str>,
84 name: impl AsRef<str>,
85 value: impl AsRef<[u8]>,
86 ) -> SecretId {
87 let id = scoped_secret_id(namespace, tenant_id, binding_id, name);
88 self.insert(id.clone(), value);
89 id
90 }
91
92 pub fn snapshot(&self) -> Vec<SecretMeta> {
93 let inner = self.inner.lock().expect("memory secret provider poisoned");
94 inner
95 .iter()
96 .filter_map(|((namespace, name), secret)| {
97 let latest = secret.latest?;
98 Some(SecretMeta {
99 id: SecretId::new(namespace.clone(), name.clone())
100 .with_version(SecretVersion::Exact(latest)),
101 provider: self.provider.clone(),
102 })
103 })
104 .collect()
105 }
106}
107
108#[async_trait]
109impl SecretProvider for MemorySecretProvider {
110 async fn get(&self, id: &SecretId) -> Result<SecretBytes, SecretError> {
111 let inner = self.inner.lock().expect("memory secret provider poisoned");
112 let secret = inner
113 .get(&(id.namespace.clone(), id.name.clone()))
114 .ok_or_else(|| SecretError::NotFound {
115 provider: self.provider.clone(),
116 id: id.clone(),
117 })?;
118 let version = match id.version {
119 SecretVersion::Latest => secret.latest,
120 SecretVersion::Exact(version) => Some(version),
121 }
122 .ok_or_else(|| SecretError::NotFound {
123 provider: self.provider.clone(),
124 id: id.clone(),
125 })?;
126 secret
127 .versions
128 .get(&version)
129 .cloned()
130 .map(SecretBytes::from)
131 .ok_or_else(|| SecretError::NotFound {
132 provider: self.provider.clone(),
133 id: id.clone(),
134 })
135 }
136
137 async fn put(&self, id: &SecretId, value: SecretBytes) -> Result<(), SecretError> {
138 let mut inner = self.inner.lock().expect("memory secret provider poisoned");
139 let value = value.with_exposed(|bytes| bytes.to_vec());
140 insert_secret(&mut inner, id.clone(), value);
141 Ok(())
142 }
143
144 async fn rotate(&self, id: &SecretId) -> Result<RotationHandle, SecretError> {
145 let mut inner = self.inner.lock().expect("memory secret provider poisoned");
146 let key = (id.namespace.clone(), id.name.clone());
147 let secret = inner.entry(key).or_default();
148 let from_version = secret.latest;
149 let to_version = from_version.unwrap_or(0) + 1;
150 let value = from_version
151 .and_then(|version| secret.versions.get(&version).cloned())
152 .unwrap_or_default();
153 secret.versions.insert(to_version, value);
154 secret.latest = Some(to_version);
155 Ok(RotationHandle {
156 provider: self.provider.clone(),
157 id: SecretId::new(id.namespace.clone(), id.name.clone())
158 .with_version(SecretVersion::Exact(to_version)),
159 from_version,
160 to_version: Some(to_version),
161 })
162 }
163
164 async fn list(&self, prefix: &SecretId) -> Result<Vec<SecretMeta>, SecretError> {
165 let inner = self.inner.lock().expect("memory secret provider poisoned");
166 Ok(inner
167 .iter()
168 .filter(|((namespace, name), _)| {
169 namespace == &prefix.namespace && name.starts_with(&prefix.name)
170 })
171 .filter_map(|((namespace, name), secret)| {
172 let latest = secret.latest?;
173 Some(SecretMeta {
174 id: SecretId::new(namespace.clone(), name.clone())
175 .with_version(SecretVersion::Exact(latest)),
176 provider: self.provider.clone(),
177 })
178 })
179 .collect())
180 }
181
182 fn namespace(&self) -> &str {
183 &self.provider
184 }
185
186 fn supports_versions(&self) -> bool {
187 true
188 }
189}
190
191fn insert_secret(
192 inner: &mut BTreeMap<(String, String), VersionedSecret>,
193 id: SecretId,
194 value: Vec<u8>,
195) {
196 let secret = inner.entry((id.namespace, id.name)).or_default();
197 let version = match id.version {
198 SecretVersion::Latest => secret.latest.unwrap_or(0) + 1,
199 SecretVersion::Exact(version) => version,
200 };
201 secret.versions.insert(version, value);
202 secret.latest = Some(secret.latest.map_or(version, |latest| latest.max(version)));
203}
204
205pub fn scoped_secret_id(
206 namespace: impl Into<String>,
207 tenant_id: impl AsRef<str>,
208 binding_id: impl AsRef<str>,
209 name: impl AsRef<str>,
210) -> SecretId {
211 SecretId::new(
212 namespace,
213 format!(
214 "tenants/{}/bindings/{}/{}",
215 tenant_id.as_ref(),
216 binding_id.as_ref(),
217 name.as_ref()
218 ),
219 )
220}
221
222#[derive(Clone)]
223pub struct ConnectorTestkit {
224 pub clock: Arc<MockClock>,
225 pub event_log: Arc<AnyEventLog>,
226 pub inbox: Arc<InboxIndex>,
227 pub metrics: Arc<MetricsRegistry>,
228 pub rate_limiter: Arc<RateLimiterFactory>,
229 pub secrets: Arc<MemorySecretProvider>,
230}
231
232impl ConnectorTestkit {
233 pub async fn new(start: OffsetDateTime) -> Self {
234 Self::with_secrets(start, MemorySecretProvider::empty()).await
235 }
236
237 pub async fn with_secrets(start: OffsetDateTime, secrets: MemorySecretProvider) -> Self {
238 let clock = MockClock::new(start);
239 let event_log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(64)));
240 let metrics = Arc::new(MetricsRegistry::default());
241 let inbox = Arc::new(
242 InboxIndex::new(event_log.clone(), metrics.clone())
243 .await
244 .expect("connector testkit inbox should initialize"),
245 );
246 Self {
247 clock,
248 event_log,
249 inbox,
250 metrics,
251 rate_limiter: Arc::new(RateLimiterFactory::default()),
252 secrets: Arc::new(secrets),
253 }
254 }
255
256 pub fn ctx(&self) -> ConnectorCtx {
257 ConnectorCtx {
258 event_log: self.event_log.clone(),
259 secrets: self.secrets.clone(),
260 inbox: self.inbox.clone(),
261 metrics: self.metrics.clone(),
262 rate_limiter: self.rate_limiter.clone(),
263 }
264 }
265
266 pub fn install_clock(&self) -> ClockOverrideGuard {
267 install_clock_override(self.clock.clone())
268 }
269}
270
271#[derive(Debug)]
272pub struct TempPackageWorkspace {
273 root: PathBuf,
274}
275
276impl TempPackageWorkspace {
277 pub fn new(prefix: impl AsRef<str>) -> io::Result<Self> {
278 let root = std::env::temp_dir().join(format!(
279 "{}-{}",
280 prefix.as_ref().trim_matches('-'),
281 Uuid::new_v4()
282 ));
283 fs::create_dir_all(&root)?;
284 Ok(Self { root })
285 }
286
287 pub fn path(&self) -> &Path {
288 &self.root
289 }
290
291 pub fn write_file(
292 &self,
293 relative: impl AsRef<Path>,
294 contents: impl AsRef<[u8]>,
295 ) -> io::Result<PathBuf> {
296 let path = self.root.join(relative.as_ref());
297 if let Some(parent) = path.parent() {
298 fs::create_dir_all(parent)?;
299 }
300 fs::write(&path, contents)?;
301 Ok(path)
302 }
303
304 pub fn write_harn_package(&self, name: &str) -> io::Result<PathBuf> {
305 self.write_file(
306 "Harn.toml",
307 format!("[package]\nname = \"{name}\"\nversion = \"0.0.0-test\"\n"),
308 )
309 }
310
311 pub fn write_cargo_package(&self, name: &str) -> io::Result<PathBuf> {
312 self.write_file(
313 "Cargo.toml",
314 format!("[package]\nname = \"{name}\"\nversion = \"0.0.0\"\nedition = \"2021\"\n"),
315 )
316 }
317
318 pub fn write_npm_package(&self, name: &str) -> io::Result<PathBuf> {
319 self.write_file(
320 "package.json",
321 format!("{{\"name\":\"{name}\",\"version\":\"0.0.0-test\"}}\n"),
322 )
323 }
324}
325
326impl Drop for TempPackageWorkspace {
327 fn drop(&mut self) {
328 let _ = fs::remove_dir_all(&self.root);
329 }
330}
331
332pub struct HttpMockGuard;
333
334impl HttpMockGuard {
335 pub fn new() -> Self {
336 crate::http::reset_http_state();
337 Self
338 }
339
340 pub fn push(
341 &self,
342 method: impl Into<String>,
343 url_pattern: impl Into<String>,
344 responses: Vec<HttpMockResponse>,
345 ) {
346 crate::http::push_http_mock(method, url_pattern, responses);
347 }
348
349 pub fn calls(&self) -> Vec<HttpMockCallSnapshot> {
350 crate::http::http_mock_calls_snapshot()
351 }
352}
353
354impl Default for HttpMockGuard {
355 fn default() -> Self {
356 Self::new()
357 }
358}
359
360impl Drop for HttpMockGuard {
361 fn drop(&mut self) {
362 crate::http::reset_http_state();
363 }
364}
365
366#[derive(Clone, Debug, PartialEq, Eq)]
367pub enum MockStreamEvent {
368 Json(JsonValue),
369 Bytes(Vec<u8>),
370 Cancelled,
371}
372
373#[derive(Clone, Debug)]
374pub struct MockStreamHandle {
375 tx: mpsc::UnboundedSender<MockStreamEvent>,
376 cancelled: Arc<AtomicBool>,
377}
378
379#[derive(Debug)]
380pub struct MockStreamReader {
381 rx: mpsc::UnboundedReceiver<MockStreamEvent>,
382 cancelled: Arc<AtomicBool>,
383}
384
385pub fn mock_stream() -> (MockStreamHandle, MockStreamReader) {
386 let (tx, rx) = mpsc::unbounded_channel();
387 let cancelled = Arc::new(AtomicBool::new(false));
388 (
389 MockStreamHandle {
390 tx,
391 cancelled: cancelled.clone(),
392 },
393 MockStreamReader { rx, cancelled },
394 )
395}
396
397impl MockStreamHandle {
398 pub fn send_json(
399 &self,
400 value: JsonValue,
401 ) -> Result<(), mpsc::error::SendError<MockStreamEvent>> {
402 self.tx.send(MockStreamEvent::Json(value))
403 }
404
405 pub fn send_bytes(
406 &self,
407 value: impl Into<Vec<u8>>,
408 ) -> Result<(), mpsc::error::SendError<MockStreamEvent>> {
409 self.tx.send(MockStreamEvent::Bytes(value.into()))
410 }
411
412 pub fn cancel(&self) {
413 self.cancelled.store(true, Ordering::SeqCst);
414 let _ = self.tx.send(MockStreamEvent::Cancelled);
415 }
416
417 pub fn is_cancelled(&self) -> bool {
418 self.cancelled.load(Ordering::SeqCst)
419 }
420}
421
422impl MockStreamReader {
423 pub async fn next(&mut self) -> Option<MockStreamEvent> {
424 self.rx.recv().await
425 }
426
427 pub fn is_cancelled(&self) -> bool {
428 self.cancelled.load(Ordering::SeqCst)
429 }
430}
431
432#[allow(clippy::derive_partial_eq_without_eq)]
434#[derive(Clone, Debug, PartialEq)]
435pub struct WebhookFixture {
436 pub raw: RawInbound,
437 pub body: Vec<u8>,
438}
439
440impl WebhookFixture {
441 pub fn with_binding(mut self, binding: &TriggerBinding) -> Self {
442 self.raw.metadata = json!({
443 "binding_id": binding.binding_id,
444 "binding_version": 1,
445 });
446 self
447 }
448
449 pub fn with_tenant(mut self, tenant_id: impl Into<String>) -> Self {
450 self.raw.tenant_id = Some(TenantId(tenant_id.into()));
451 self
452 }
453}
454
455pub fn github_ping_fixture(secret: &str, received_at: OffsetDateTime) -> WebhookFixture {
456 let body = br#"{"zen":"Keep it logically awesome.","hook_id":42}"#.to_vec();
457 let mut raw = RawInbound::new(
458 "webhook",
459 BTreeMap::from([
460 ("content-type".to_string(), "application/json".to_string()),
461 ("x-github-event".to_string(), "ping".to_string()),
462 ("x-github-delivery".to_string(), "delivery-1".to_string()),
463 (
464 "x-hub-signature-256".to_string(),
465 format!("sha256={}", hmac_sha256_hex(secret.as_bytes(), &body)),
466 ),
467 ]),
468 body.clone(),
469 );
470 raw.received_at = received_at;
471 WebhookFixture { raw, body }
472}
473
474pub fn slack_message_fixture(
475 secret: &str,
476 timestamp: i64,
477 received_at: OffsetDateTime,
478) -> WebhookFixture {
479 let body = br#"{"type":"event_callback","event_id":"Ev1","team_id":"T1","event":{"type":"message","channel_type":"channel","channel":"C1","user":"U1","text":"hello","event_ts":"1710000000.000100"}}"#.to_vec();
480 let signed = format!("v0:{timestamp}:{}", String::from_utf8_lossy(&body));
481 let mut raw = RawInbound::new(
482 "webhook",
483 BTreeMap::from([
484 ("content-type".to_string(), "application/json".to_string()),
485 (
486 "x-slack-request-timestamp".to_string(),
487 timestamp.to_string(),
488 ),
489 (
490 "x-slack-signature".to_string(),
491 format!(
492 "v0={}",
493 hmac_sha256_hex(secret.as_bytes(), signed.as_bytes())
494 ),
495 ),
496 ]),
497 body.clone(),
498 );
499 raw.received_at = received_at;
500 WebhookFixture { raw, body }
501}
502
503pub fn linear_issue_update_fixture(secret: &str, received_at: OffsetDateTime) -> WebhookFixture {
504 let body = br#"{"type":"Issue","action":"update","createdAt":"2026-04-19T00:00:00Z","data":{"id":"issue-1","identifier":"ENG-1","title":"connector"},"updatedFrom":{"title":"old"}}"#.to_vec();
505 let mut raw = RawInbound::new(
506 "webhook",
507 BTreeMap::from([
508 ("content-type".to_string(), "application/json".to_string()),
509 (
510 "linear-signature".to_string(),
511 hmac_sha256_hex(secret.as_bytes(), &body),
512 ),
513 ]),
514 body.clone(),
515 );
516 raw.received_at = received_at;
517 WebhookFixture { raw, body }
518}
519
520pub fn notion_page_content_updated_fixture(
521 secret: &str,
522 received_at: OffsetDateTime,
523) -> WebhookFixture {
524 let body = br#"{"id":"evt_1","type":"page.content_updated","workspace_id":"ws_1","subscription_id":"sub_1","integration_id":"int_1","entity":{"id":"page_1","type":"page"},"api_version":"2022-06-28"}"#.to_vec();
525 let mut raw = RawInbound::new(
526 "webhook",
527 BTreeMap::from([
528 ("content-type".to_string(), "application/json".to_string()),
529 (
530 "x-notion-signature".to_string(),
531 format!("sha256={}", hmac_sha256_hex(secret.as_bytes(), &body)),
532 ),
533 ("request-id".to_string(), "req_1".to_string()),
534 ]),
535 body.clone(),
536 );
537 raw.received_at = received_at;
538 WebhookFixture { raw, body }
539}
540
541pub fn webhook_binding(
542 provider: impl Into<String>,
543 binding_id: impl Into<String>,
544 signing_secret: Option<SecretId>,
545) -> TriggerBinding {
546 let provider = provider.into();
547 let mut binding =
548 TriggerBinding::new(ProviderId::from(provider.clone()), "webhook", binding_id);
549 let mut secrets = serde_json::Map::new();
550 if let Some(secret) = signing_secret {
551 secrets.insert(
552 "signing_secret".to_string(),
553 JsonValue::String(secret.to_string()),
554 );
555 }
556 binding.config = json!({
557 "path": format!("/hooks/{provider}"),
558 "match": {"events": ["*"]},
559 "secrets": secrets,
560 });
561 binding
562}
563
564fn hmac_sha256_hex(secret: &[u8], data: &[u8]) -> String {
565 hex::encode(crate::connectors::hmac::hmac_sha256(secret, data))
566}
567
568pub async fn advance_until<F>(
569 clock: &MockClock,
570 timeout: StdDuration,
571 tick: StdDuration,
572 mut predicate: F,
573) -> bool
574where
575 F: FnMut() -> bool,
576{
577 let mut elapsed = StdDuration::ZERO;
578 while elapsed <= timeout {
579 if predicate() {
580 return true;
581 }
582 clock.advance_std(tick).await;
583 elapsed += tick;
584 }
585 predicate()
586}
587
588#[cfg(test)]
589mod tests {
590 use super::*;
591 use crate::secrets::SecretProvider;
592
593 fn parse_ts(value: &str) -> OffsetDateTime {
594 OffsetDateTime::parse(value, &time::format_description::well_known::Rfc3339).unwrap()
595 }
596
597 #[tokio::test]
598 async fn memory_secret_provider_scopes_and_versions_secrets() {
599 let mut provider = MemorySecretProvider::new("test");
600 let scoped = provider.insert_scoped("github", "tenant-a", "binding-a", "token", "v1");
601 provider
602 .put(&scoped, SecretBytes::from("v2"))
603 .await
604 .expect("put latest");
605
606 let latest = provider.get(&scoped).await.expect("latest");
607 assert_eq!(latest.with_exposed(|bytes| bytes.to_vec()), b"v2".to_vec());
608 let first = provider
609 .get(&scoped.clone().with_version(SecretVersion::Exact(1)))
610 .await
611 .expect("v1");
612 assert_eq!(first.with_exposed(|bytes| bytes.to_vec()), b"v1".to_vec());
613 assert!(provider
614 .get(&scoped_secret_id(
615 "github",
616 "tenant-b",
617 "binding-a",
618 "token"
619 ))
620 .await
621 .is_err());
622 }
623
624 #[tokio::test]
625 async fn connector_testkit_controls_clock_and_deadlines() {
626 let kit = ConnectorTestkit::new(parse_ts("2026-04-19T00:00:00Z")).await;
627 let _guard = kit.install_clock();
628 let mut fired = false;
629 assert!(
630 !advance_until(
631 &kit.clock,
632 StdDuration::from_millis(20),
633 StdDuration::from_millis(10),
634 || fired,
635 )
636 .await
637 );
638 fired = true;
639 assert!(
640 advance_until(
641 &kit.clock,
642 StdDuration::from_millis(20),
643 StdDuration::from_millis(10),
644 || fired,
645 )
646 .await
647 );
648 assert_eq!(instant_now().as_millis(), 30);
649 }
650
651 #[tokio::test]
652 async fn mock_stream_cancels_reader_without_wall_clock_sleep() {
653 let (handle, mut reader) = mock_stream();
654 handle.send_json(json!({"event": "one"})).expect("send");
655 assert_eq!(
656 reader.next().await,
657 Some(MockStreamEvent::Json(json!({"event": "one"})))
658 );
659 handle.cancel();
660 assert_eq!(reader.next().await, Some(MockStreamEvent::Cancelled));
661 assert!(reader.is_cancelled());
662 }
663
664 #[test]
665 fn temp_workspace_writes_package_markers() {
666 let workspace = TempPackageWorkspace::new("harn-testkit").expect("workspace");
667 workspace.write_harn_package("demo").expect("harn package");
668 workspace
669 .write_cargo_package("demo")
670 .expect("cargo package");
671 workspace.write_npm_package("demo").expect("npm package");
672 assert!(workspace.path().join("Harn.toml").exists());
673 assert!(workspace.path().join("Cargo.toml").exists());
674 assert!(workspace.path().join("package.json").exists());
675 }
676
677 #[test]
678 fn webhook_fixtures_include_provider_signatures() {
679 let received_at = parse_ts("2026-04-19T00:00:00Z");
680 let github = github_ping_fixture("topsecret", received_at);
681 assert!(github.raw.headers["x-hub-signature-256"].starts_with("sha256="));
682 let slack = slack_message_fixture("topsecret", received_at.unix_timestamp(), received_at);
683 assert!(slack.raw.headers["x-slack-signature"].starts_with("v0="));
684 let linear = linear_issue_update_fixture("topsecret", received_at);
685 assert_eq!(linear.raw.headers["linear-signature"].len(), 64);
686 let notion = notion_page_content_updated_fixture("topsecret", received_at);
687 assert!(notion.raw.headers["x-notion-signature"].starts_with("sha256="));
688 }
689}