1use std::collections::BTreeMap;
2use std::fs;
3use std::io;
4use std::path::{Path, PathBuf};
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, Mutex};
7use std::time::Duration as StdDuration;
8
9use async_trait::async_trait;
10use serde_json::{json, Value as JsonValue};
11use time::OffsetDateTime;
12use tokio::sync::mpsc;
13use uuid::Uuid;
14
15pub use crate::http::framing::{
16 http_content_length_from_header_lines, HttpContentLengthLimitError, TEST_HTTP_MAX_BODY_BYTES,
17};
18pub use crate::http::{HttpMockCallSnapshot, HttpMockResponse};
19pub use crate::triggers::test_util::clock::{
20 active_mock_clock, install_override as install_clock_override, instant_now, now_ms, now_utc,
21 ClockInstant, ClockOverrideGuard, MockClock,
22};
23
24use crate::connectors::{
25 ConnectorCtx, MetricsRegistry, RateLimiterFactory, RawInbound, TriggerBinding,
26};
27use crate::event_log::{AnyEventLog, MemoryEventLog};
28use crate::secrets::{
29 RotationHandle, SecretBytes, SecretError, SecretId, SecretMeta, SecretProvider, SecretVersion,
30};
31use crate::triggers::{InboxIndex, ProviderId, TenantId};
32
33#[derive(Clone, Debug)]
34pub struct MemorySecretProvider {
35 provider: String,
36 inner: Arc<Mutex<BTreeMap<(String, String), VersionedSecret>>>,
37}
38
39#[derive(Clone, Debug, Default)]
40struct VersionedSecret {
41 latest: Option<u64>,
42 versions: BTreeMap<u64, Vec<u8>>,
43}
44
45impl MemorySecretProvider {
46 pub fn new(provider: impl Into<String>) -> Self {
47 Self {
48 provider: provider.into(),
49 inner: Arc::new(Mutex::new(BTreeMap::new())),
50 }
51 }
52
53 pub fn empty() -> Self {
54 Self::new("connector-testkit")
55 }
56
57 pub fn with_secret(mut self, id: SecretId, value: impl AsRef<[u8]>) -> Self {
58 self.insert(id, value);
59 self
60 }
61
62 pub fn with_scoped_secret(
63 self,
64 namespace: impl Into<String>,
65 tenant_id: impl AsRef<str>,
66 binding_id: impl AsRef<str>,
67 name: impl AsRef<str>,
68 value: impl AsRef<[u8]>,
69 ) -> Self {
70 let id = scoped_secret_id(namespace, tenant_id, binding_id, name);
71 self.with_secret(id, value)
72 }
73
74 pub fn insert(&mut self, id: SecretId, value: impl AsRef<[u8]>) {
75 let mut inner = self.inner.lock().expect("memory secret provider poisoned");
76 insert_secret(&mut inner, id, value.as_ref().to_vec());
77 }
78
79 pub fn insert_scoped(
80 &mut self,
81 namespace: impl Into<String>,
82 tenant_id: impl AsRef<str>,
83 binding_id: impl AsRef<str>,
84 name: impl AsRef<str>,
85 value: impl AsRef<[u8]>,
86 ) -> SecretId {
87 let id = scoped_secret_id(namespace, tenant_id, binding_id, name);
88 self.insert(id.clone(), value);
89 id
90 }
91
92 pub fn snapshot(&self) -> Vec<SecretMeta> {
93 let inner = self.inner.lock().expect("memory secret provider poisoned");
94 inner
95 .iter()
96 .filter_map(|((namespace, name), secret)| {
97 let latest = secret.latest?;
98 Some(SecretMeta {
99 id: SecretId::new(namespace.clone(), name.clone())
100 .with_version(SecretVersion::Exact(latest)),
101 provider: self.provider.clone(),
102 })
103 })
104 .collect()
105 }
106}
107
108#[async_trait]
109impl SecretProvider for MemorySecretProvider {
110 async fn get(&self, id: &SecretId) -> Result<SecretBytes, SecretError> {
111 let inner = self.inner.lock().expect("memory secret provider poisoned");
112 let secret = inner
113 .get(&(id.namespace.clone(), id.name.clone()))
114 .ok_or_else(|| SecretError::NotFound {
115 provider: self.provider.clone(),
116 id: id.clone(),
117 })?;
118 let version = match id.version {
119 SecretVersion::Latest => secret.latest,
120 SecretVersion::Exact(version) => Some(version),
121 }
122 .ok_or_else(|| SecretError::NotFound {
123 provider: self.provider.clone(),
124 id: id.clone(),
125 })?;
126 secret
127 .versions
128 .get(&version)
129 .cloned()
130 .map(SecretBytes::from)
131 .ok_or_else(|| SecretError::NotFound {
132 provider: self.provider.clone(),
133 id: id.clone(),
134 })
135 }
136
137 async fn put(&self, id: &SecretId, value: SecretBytes) -> Result<(), SecretError> {
138 let mut inner = self.inner.lock().expect("memory secret provider poisoned");
139 let value = value.with_exposed(|bytes| bytes.to_vec());
140 insert_secret(&mut inner, id.clone(), value);
141 Ok(())
142 }
143
144 async fn rotate(&self, id: &SecretId) -> Result<RotationHandle, SecretError> {
145 let mut inner = self.inner.lock().expect("memory secret provider poisoned");
146 let key = (id.namespace.clone(), id.name.clone());
147 let secret = inner.entry(key).or_default();
148 let from_version = secret.latest;
149 let to_version = from_version.unwrap_or(0) + 1;
150 let value = from_version
151 .and_then(|version| secret.versions.get(&version).cloned())
152 .unwrap_or_default();
153 secret.versions.insert(to_version, value);
154 secret.latest = Some(to_version);
155 Ok(RotationHandle {
156 provider: self.provider.clone(),
157 id: SecretId::new(id.namespace.clone(), id.name.clone())
158 .with_version(SecretVersion::Exact(to_version)),
159 from_version,
160 to_version: Some(to_version),
161 })
162 }
163
164 async fn list(&self, prefix: &SecretId) -> Result<Vec<SecretMeta>, SecretError> {
165 let inner = self.inner.lock().expect("memory secret provider poisoned");
166 Ok(inner
167 .iter()
168 .filter(|((namespace, name), _)| {
169 namespace == &prefix.namespace && name.starts_with(&prefix.name)
170 })
171 .filter_map(|((namespace, name), secret)| {
172 let latest = secret.latest?;
173 Some(SecretMeta {
174 id: SecretId::new(namespace.clone(), name.clone())
175 .with_version(SecretVersion::Exact(latest)),
176 provider: self.provider.clone(),
177 })
178 })
179 .collect())
180 }
181
182 fn namespace(&self) -> &str {
183 &self.provider
184 }
185
186 fn supports_versions(&self) -> bool {
187 true
188 }
189}
190
191fn insert_secret(
192 inner: &mut BTreeMap<(String, String), VersionedSecret>,
193 id: SecretId,
194 value: Vec<u8>,
195) {
196 let secret = inner.entry((id.namespace, id.name)).or_default();
197 let version = match id.version {
198 SecretVersion::Latest => secret.latest.unwrap_or(0) + 1,
199 SecretVersion::Exact(version) => version,
200 };
201 secret.versions.insert(version, value);
202 secret.latest = Some(secret.latest.map_or(version, |latest| latest.max(version)));
203}
204
205pub fn scoped_secret_id(
206 namespace: impl Into<String>,
207 tenant_id: impl AsRef<str>,
208 binding_id: impl AsRef<str>,
209 name: impl AsRef<str>,
210) -> SecretId {
211 SecretId::new(
212 namespace,
213 format!(
214 "tenants/{}/bindings/{}/{}",
215 tenant_id.as_ref(),
216 binding_id.as_ref(),
217 name.as_ref()
218 ),
219 )
220}
221
222#[derive(Clone)]
223pub struct ConnectorTestkit {
224 pub clock: Arc<MockClock>,
225 pub event_log: Arc<AnyEventLog>,
226 pub inbox: Arc<InboxIndex>,
227 pub metrics: Arc<MetricsRegistry>,
228 pub rate_limiter: Arc<RateLimiterFactory>,
229 pub secrets: Arc<MemorySecretProvider>,
230}
231
232impl ConnectorTestkit {
233 pub async fn new(start: OffsetDateTime) -> Self {
234 Self::with_secrets(start, MemorySecretProvider::empty()).await
235 }
236
237 pub async fn with_secrets(start: OffsetDateTime, secrets: MemorySecretProvider) -> Self {
238 let clock = MockClock::new(start);
239 let event_log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(64)));
240 let metrics = Arc::new(MetricsRegistry::default());
241 let inbox = Arc::new(
242 InboxIndex::new(event_log.clone(), metrics.clone())
243 .await
244 .expect("connector testkit inbox should initialize"),
245 );
246 Self {
247 clock,
248 event_log,
249 inbox,
250 metrics,
251 rate_limiter: Arc::new(RateLimiterFactory::default()),
252 secrets: Arc::new(secrets),
253 }
254 }
255
256 pub fn ctx(&self) -> ConnectorCtx {
257 ConnectorCtx {
258 event_log: self.event_log.clone(),
259 secrets: self.secrets.clone(),
260 inbox: self.inbox.clone(),
261 metrics: self.metrics.clone(),
262 rate_limiter: self.rate_limiter.clone(),
263 }
264 }
265
266 pub fn install_clock(&self) -> ClockOverrideGuard {
267 install_clock_override(self.clock.clone())
268 }
269}
270
271#[derive(Debug)]
272pub struct TempPackageWorkspace {
273 root: PathBuf,
274}
275
276impl TempPackageWorkspace {
277 pub fn new(prefix: impl AsRef<str>) -> io::Result<Self> {
278 let root = std::env::temp_dir().join(format!(
279 "{}-{}",
280 prefix.as_ref().trim_matches('-'),
281 Uuid::new_v4()
282 ));
283 fs::create_dir_all(&root)?;
284 Ok(Self { root })
285 }
286
287 pub fn path(&self) -> &Path {
288 &self.root
289 }
290
291 pub fn write_file(
292 &self,
293 relative: impl AsRef<Path>,
294 contents: impl AsRef<[u8]>,
295 ) -> io::Result<PathBuf> {
296 let path = self.root.join(relative.as_ref());
297 if let Some(parent) = path.parent() {
298 fs::create_dir_all(parent)?;
299 }
300 fs::write(&path, contents)?;
301 Ok(path)
302 }
303
304 pub fn write_harn_package(&self, name: &str) -> io::Result<PathBuf> {
305 self.write_file(
306 "Harn.toml",
307 format!("[package]\nname = \"{}\"\nversion = \"0.0.0-test\"\n", name),
308 )
309 }
310
311 pub fn write_cargo_package(&self, name: &str) -> io::Result<PathBuf> {
312 self.write_file(
313 "Cargo.toml",
314 format!(
315 "[package]\nname = \"{}\"\nversion = \"0.0.0\"\nedition = \"2021\"\n",
316 name
317 ),
318 )
319 }
320
321 pub fn write_npm_package(&self, name: &str) -> io::Result<PathBuf> {
322 self.write_file(
323 "package.json",
324 format!("{{\"name\":\"{}\",\"version\":\"0.0.0-test\"}}\n", name),
325 )
326 }
327}
328
329impl Drop for TempPackageWorkspace {
330 fn drop(&mut self) {
331 let _ = fs::remove_dir_all(&self.root);
332 }
333}
334
335pub struct HttpMockGuard;
336
337impl HttpMockGuard {
338 pub fn new() -> Self {
339 crate::http::reset_http_state();
340 Self
341 }
342
343 pub fn push(
344 &self,
345 method: impl Into<String>,
346 url_pattern: impl Into<String>,
347 responses: Vec<HttpMockResponse>,
348 ) {
349 crate::http::push_http_mock(method, url_pattern, responses);
350 }
351
352 pub fn calls(&self) -> Vec<HttpMockCallSnapshot> {
353 crate::http::http_mock_calls_snapshot()
354 }
355}
356
357impl Default for HttpMockGuard {
358 fn default() -> Self {
359 Self::new()
360 }
361}
362
363impl Drop for HttpMockGuard {
364 fn drop(&mut self) {
365 crate::http::reset_http_state();
366 }
367}
368
369#[derive(Clone, Debug, PartialEq)]
370pub enum MockStreamEvent {
371 Json(JsonValue),
372 Bytes(Vec<u8>),
373 Cancelled,
374}
375
376#[derive(Clone, Debug)]
377pub struct MockStreamHandle {
378 tx: mpsc::UnboundedSender<MockStreamEvent>,
379 cancelled: Arc<AtomicBool>,
380}
381
382#[derive(Debug)]
383pub struct MockStreamReader {
384 rx: mpsc::UnboundedReceiver<MockStreamEvent>,
385 cancelled: Arc<AtomicBool>,
386}
387
388pub fn mock_stream() -> (MockStreamHandle, MockStreamReader) {
389 let (tx, rx) = mpsc::unbounded_channel();
390 let cancelled = Arc::new(AtomicBool::new(false));
391 (
392 MockStreamHandle {
393 tx,
394 cancelled: cancelled.clone(),
395 },
396 MockStreamReader { rx, cancelled },
397 )
398}
399
400impl MockStreamHandle {
401 pub fn send_json(
402 &self,
403 value: JsonValue,
404 ) -> Result<(), mpsc::error::SendError<MockStreamEvent>> {
405 self.tx.send(MockStreamEvent::Json(value))
406 }
407
408 pub fn send_bytes(
409 &self,
410 value: impl Into<Vec<u8>>,
411 ) -> Result<(), mpsc::error::SendError<MockStreamEvent>> {
412 self.tx.send(MockStreamEvent::Bytes(value.into()))
413 }
414
415 pub fn cancel(&self) {
416 self.cancelled.store(true, Ordering::SeqCst);
417 let _ = self.tx.send(MockStreamEvent::Cancelled);
418 }
419
420 pub fn is_cancelled(&self) -> bool {
421 self.cancelled.load(Ordering::SeqCst)
422 }
423}
424
425impl MockStreamReader {
426 pub async fn next(&mut self) -> Option<MockStreamEvent> {
427 self.rx.recv().await
428 }
429
430 pub fn is_cancelled(&self) -> bool {
431 self.cancelled.load(Ordering::SeqCst)
432 }
433}
434
435#[derive(Clone, Debug, PartialEq)]
436pub struct WebhookFixture {
437 pub raw: RawInbound,
438 pub body: Vec<u8>,
439}
440
441impl WebhookFixture {
442 pub fn with_binding(mut self, binding: &TriggerBinding) -> Self {
443 self.raw.metadata = json!({
444 "binding_id": binding.binding_id,
445 "binding_version": 1,
446 });
447 self
448 }
449
450 pub fn with_tenant(mut self, tenant_id: impl Into<String>) -> Self {
451 self.raw.tenant_id = Some(TenantId(tenant_id.into()));
452 self
453 }
454}
455
456pub fn github_ping_fixture(secret: &str, received_at: OffsetDateTime) -> WebhookFixture {
457 let body = br#"{"zen":"Keep it logically awesome.","hook_id":42}"#.to_vec();
458 let mut raw = RawInbound::new(
459 "webhook",
460 BTreeMap::from([
461 ("content-type".to_string(), "application/json".to_string()),
462 ("x-github-event".to_string(), "ping".to_string()),
463 ("x-github-delivery".to_string(), "delivery-1".to_string()),
464 (
465 "x-hub-signature-256".to_string(),
466 format!("sha256={}", hmac_sha256_hex(secret.as_bytes(), &body)),
467 ),
468 ]),
469 body.clone(),
470 );
471 raw.received_at = received_at;
472 WebhookFixture { raw, body }
473}
474
475pub fn slack_message_fixture(
476 secret: &str,
477 timestamp: i64,
478 received_at: OffsetDateTime,
479) -> WebhookFixture {
480 let body = br#"{"type":"event_callback","event_id":"Ev1","team_id":"T1","event":{"type":"message","channel_type":"channel","channel":"C1","user":"U1","text":"hello","event_ts":"1710000000.000100"}}"#.to_vec();
481 let signed = format!("v0:{timestamp}:{}", String::from_utf8_lossy(&body));
482 let mut raw = RawInbound::new(
483 "webhook",
484 BTreeMap::from([
485 ("content-type".to_string(), "application/json".to_string()),
486 (
487 "x-slack-request-timestamp".to_string(),
488 timestamp.to_string(),
489 ),
490 (
491 "x-slack-signature".to_string(),
492 format!(
493 "v0={}",
494 hmac_sha256_hex(secret.as_bytes(), signed.as_bytes())
495 ),
496 ),
497 ]),
498 body.clone(),
499 );
500 raw.received_at = received_at;
501 WebhookFixture { raw, body }
502}
503
504pub fn linear_issue_update_fixture(secret: &str, received_at: OffsetDateTime) -> WebhookFixture {
505 let body = br#"{"type":"Issue","action":"update","createdAt":"2026-04-19T00:00:00Z","data":{"id":"issue-1","identifier":"ENG-1","title":"connector"},"updatedFrom":{"title":"old"}}"#.to_vec();
506 let mut raw = RawInbound::new(
507 "webhook",
508 BTreeMap::from([
509 ("content-type".to_string(), "application/json".to_string()),
510 (
511 "linear-signature".to_string(),
512 hmac_sha256_hex(secret.as_bytes(), &body),
513 ),
514 ]),
515 body.clone(),
516 );
517 raw.received_at = received_at;
518 WebhookFixture { raw, body }
519}
520
521pub fn notion_page_content_updated_fixture(
522 secret: &str,
523 received_at: OffsetDateTime,
524) -> WebhookFixture {
525 let body = br#"{"id":"evt_1","type":"page.content_updated","workspace_id":"ws_1","subscription_id":"sub_1","integration_id":"int_1","entity":{"id":"page_1","type":"page"},"api_version":"2022-06-28"}"#.to_vec();
526 let mut raw = RawInbound::new(
527 "webhook",
528 BTreeMap::from([
529 ("content-type".to_string(), "application/json".to_string()),
530 (
531 "x-notion-signature".to_string(),
532 format!("sha256={}", hmac_sha256_hex(secret.as_bytes(), &body)),
533 ),
534 ("request-id".to_string(), "req_1".to_string()),
535 ]),
536 body.clone(),
537 );
538 raw.received_at = received_at;
539 WebhookFixture { raw, body }
540}
541
542pub fn webhook_binding(
543 provider: impl Into<String>,
544 binding_id: impl Into<String>,
545 signing_secret: Option<SecretId>,
546) -> TriggerBinding {
547 let provider = provider.into();
548 let mut binding =
549 TriggerBinding::new(ProviderId::from(provider.clone()), "webhook", binding_id);
550 let mut secrets = serde_json::Map::new();
551 if let Some(secret) = signing_secret {
552 secrets.insert(
553 "signing_secret".to_string(),
554 JsonValue::String(secret.to_string()),
555 );
556 }
557 binding.config = json!({
558 "path": format!("/hooks/{provider}"),
559 "match": {"events": ["*"]},
560 "secrets": secrets,
561 });
562 binding
563}
564
565fn hmac_sha256_hex(secret: &[u8], data: &[u8]) -> String {
566 hex::encode(crate::connectors::hmac::hmac_sha256(secret, data))
567}
568
569pub async fn advance_until<F>(
570 clock: &MockClock,
571 timeout: StdDuration,
572 tick: StdDuration,
573 mut predicate: F,
574) -> bool
575where
576 F: FnMut() -> bool,
577{
578 let mut elapsed = StdDuration::ZERO;
579 while elapsed <= timeout {
580 if predicate() {
581 return true;
582 }
583 clock.advance_std(tick).await;
584 elapsed += tick;
585 }
586 predicate()
587}
588
589#[cfg(test)]
590mod tests {
591 use super::*;
592 use crate::secrets::SecretProvider;
593
594 fn parse_ts(value: &str) -> OffsetDateTime {
595 OffsetDateTime::parse(value, &time::format_description::well_known::Rfc3339).unwrap()
596 }
597
598 #[tokio::test]
599 async fn memory_secret_provider_scopes_and_versions_secrets() {
600 let mut provider = MemorySecretProvider::new("test");
601 let scoped = provider.insert_scoped("github", "tenant-a", "binding-a", "token", "v1");
602 provider
603 .put(&scoped, SecretBytes::from("v2"))
604 .await
605 .expect("put latest");
606
607 let latest = provider.get(&scoped).await.expect("latest");
608 assert_eq!(latest.with_exposed(|bytes| bytes.to_vec()), b"v2".to_vec());
609 let first = provider
610 .get(&scoped.clone().with_version(SecretVersion::Exact(1)))
611 .await
612 .expect("v1");
613 assert_eq!(first.with_exposed(|bytes| bytes.to_vec()), b"v1".to_vec());
614 assert!(provider
615 .get(&scoped_secret_id(
616 "github",
617 "tenant-b",
618 "binding-a",
619 "token"
620 ))
621 .await
622 .is_err());
623 }
624
625 #[tokio::test]
626 async fn connector_testkit_controls_clock_and_deadlines() {
627 let kit = ConnectorTestkit::new(parse_ts("2026-04-19T00:00:00Z")).await;
628 let _guard = kit.install_clock();
629 let mut fired = false;
630 assert!(
631 !advance_until(
632 &kit.clock,
633 StdDuration::from_millis(20),
634 StdDuration::from_millis(10),
635 || fired,
636 )
637 .await
638 );
639 fired = true;
640 assert!(
641 advance_until(
642 &kit.clock,
643 StdDuration::from_millis(20),
644 StdDuration::from_millis(10),
645 || fired,
646 )
647 .await
648 );
649 assert_eq!(instant_now().as_millis(), 30);
650 }
651
652 #[tokio::test]
653 async fn mock_stream_cancels_reader_without_wall_clock_sleep() {
654 let (handle, mut reader) = mock_stream();
655 handle.send_json(json!({"event": "one"})).expect("send");
656 assert_eq!(
657 reader.next().await,
658 Some(MockStreamEvent::Json(json!({"event": "one"})))
659 );
660 handle.cancel();
661 assert_eq!(reader.next().await, Some(MockStreamEvent::Cancelled));
662 assert!(reader.is_cancelled());
663 }
664
665 #[test]
666 fn temp_workspace_writes_package_markers() {
667 let workspace = TempPackageWorkspace::new("harn-testkit").expect("workspace");
668 workspace.write_harn_package("demo").expect("harn package");
669 workspace
670 .write_cargo_package("demo")
671 .expect("cargo package");
672 workspace.write_npm_package("demo").expect("npm package");
673 assert!(workspace.path().join("Harn.toml").exists());
674 assert!(workspace.path().join("Cargo.toml").exists());
675 assert!(workspace.path().join("package.json").exists());
676 }
677
678 #[test]
679 fn webhook_fixtures_include_provider_signatures() {
680 let received_at = parse_ts("2026-04-19T00:00:00Z");
681 let github = github_ping_fixture("topsecret", received_at);
682 assert!(github.raw.headers["x-hub-signature-256"].starts_with("sha256="));
683 let slack = slack_message_fixture("topsecret", received_at.unix_timestamp(), received_at);
684 assert!(slack.raw.headers["x-slack-signature"].starts_with("v0="));
685 let linear = linear_issue_update_fixture("topsecret", received_at);
686 assert_eq!(linear.raw.headers["linear-signature"].len(), 64);
687 let notion = notion_page_content_updated_fixture("topsecret", received_at);
688 assert!(notion.raw.headers["x-notion-signature"].starts_with("sha256="));
689 }
690}