1use std::collections::BTreeMap;
2use std::fs;
3use std::io;
4use std::path::{Path, PathBuf};
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, Mutex};
7use std::time::Duration as StdDuration;
8
9use async_trait::async_trait;
10use serde_json::{json, Value as JsonValue};
11use sha2::{Digest, Sha256};
12use time::OffsetDateTime;
13use tokio::sync::mpsc;
14use uuid::Uuid;
15
16pub use crate::http::{HttpMockCallSnapshot, HttpMockResponse};
17pub use crate::triggers::test_util::clock::{
18 active_mock_clock, install_override as install_clock_override, instant_now, now_ms, now_utc,
19 ClockInstant, ClockOverrideGuard, MockClock,
20};
21
22use crate::connectors::{
23 ConnectorCtx, MetricsRegistry, RateLimiterFactory, RawInbound, TriggerBinding,
24};
25use crate::event_log::{AnyEventLog, MemoryEventLog};
26use crate::secrets::{
27 RotationHandle, SecretBytes, SecretError, SecretId, SecretMeta, SecretProvider, SecretVersion,
28};
29use crate::triggers::{InboxIndex, ProviderId, TenantId};
30
31#[derive(Clone, Debug)]
32pub struct MemorySecretProvider {
33 provider: String,
34 inner: Arc<Mutex<BTreeMap<(String, String), VersionedSecret>>>,
35}
36
37#[derive(Clone, Debug, Default)]
38struct VersionedSecret {
39 latest: Option<u64>,
40 versions: BTreeMap<u64, Vec<u8>>,
41}
42
43impl MemorySecretProvider {
44 pub fn new(provider: impl Into<String>) -> Self {
45 Self {
46 provider: provider.into(),
47 inner: Arc::new(Mutex::new(BTreeMap::new())),
48 }
49 }
50
51 pub fn empty() -> Self {
52 Self::new("connector-testkit")
53 }
54
55 pub fn with_secret(mut self, id: SecretId, value: impl AsRef<[u8]>) -> Self {
56 self.insert(id, value);
57 self
58 }
59
60 pub fn with_scoped_secret(
61 self,
62 namespace: impl Into<String>,
63 tenant_id: impl AsRef<str>,
64 binding_id: impl AsRef<str>,
65 name: impl AsRef<str>,
66 value: impl AsRef<[u8]>,
67 ) -> Self {
68 let id = scoped_secret_id(namespace, tenant_id, binding_id, name);
69 self.with_secret(id, value)
70 }
71
72 pub fn insert(&mut self, id: SecretId, value: impl AsRef<[u8]>) {
73 let mut inner = self.inner.lock().expect("memory secret provider poisoned");
74 insert_secret(&mut inner, id, value.as_ref().to_vec());
75 }
76
77 pub fn insert_scoped(
78 &mut self,
79 namespace: impl Into<String>,
80 tenant_id: impl AsRef<str>,
81 binding_id: impl AsRef<str>,
82 name: impl AsRef<str>,
83 value: impl AsRef<[u8]>,
84 ) -> SecretId {
85 let id = scoped_secret_id(namespace, tenant_id, binding_id, name);
86 self.insert(id.clone(), value);
87 id
88 }
89
90 pub fn snapshot(&self) -> Vec<SecretMeta> {
91 let inner = self.inner.lock().expect("memory secret provider poisoned");
92 inner
93 .iter()
94 .filter_map(|((namespace, name), secret)| {
95 let latest = secret.latest?;
96 Some(SecretMeta {
97 id: SecretId::new(namespace.clone(), name.clone())
98 .with_version(SecretVersion::Exact(latest)),
99 provider: self.provider.clone(),
100 })
101 })
102 .collect()
103 }
104}
105
106#[async_trait]
107impl SecretProvider for MemorySecretProvider {
108 async fn get(&self, id: &SecretId) -> Result<SecretBytes, SecretError> {
109 let inner = self.inner.lock().expect("memory secret provider poisoned");
110 let secret = inner
111 .get(&(id.namespace.clone(), id.name.clone()))
112 .ok_or_else(|| SecretError::NotFound {
113 provider: self.provider.clone(),
114 id: id.clone(),
115 })?;
116 let version = match id.version {
117 SecretVersion::Latest => secret.latest,
118 SecretVersion::Exact(version) => Some(version),
119 }
120 .ok_or_else(|| SecretError::NotFound {
121 provider: self.provider.clone(),
122 id: id.clone(),
123 })?;
124 secret
125 .versions
126 .get(&version)
127 .cloned()
128 .map(SecretBytes::from)
129 .ok_or_else(|| SecretError::NotFound {
130 provider: self.provider.clone(),
131 id: id.clone(),
132 })
133 }
134
135 async fn put(&self, id: &SecretId, value: SecretBytes) -> Result<(), SecretError> {
136 let mut inner = self.inner.lock().expect("memory secret provider poisoned");
137 let value = value.with_exposed(|bytes| bytes.to_vec());
138 insert_secret(&mut inner, id.clone(), value);
139 Ok(())
140 }
141
142 async fn rotate(&self, id: &SecretId) -> Result<RotationHandle, SecretError> {
143 let mut inner = self.inner.lock().expect("memory secret provider poisoned");
144 let key = (id.namespace.clone(), id.name.clone());
145 let secret = inner.entry(key).or_default();
146 let from_version = secret.latest;
147 let to_version = from_version.unwrap_or(0) + 1;
148 let value = from_version
149 .and_then(|version| secret.versions.get(&version).cloned())
150 .unwrap_or_default();
151 secret.versions.insert(to_version, value);
152 secret.latest = Some(to_version);
153 Ok(RotationHandle {
154 provider: self.provider.clone(),
155 id: SecretId::new(id.namespace.clone(), id.name.clone())
156 .with_version(SecretVersion::Exact(to_version)),
157 from_version,
158 to_version: Some(to_version),
159 })
160 }
161
162 async fn list(&self, prefix: &SecretId) -> Result<Vec<SecretMeta>, SecretError> {
163 let inner = self.inner.lock().expect("memory secret provider poisoned");
164 Ok(inner
165 .iter()
166 .filter(|((namespace, name), _)| {
167 namespace == &prefix.namespace && name.starts_with(&prefix.name)
168 })
169 .filter_map(|((namespace, name), secret)| {
170 let latest = secret.latest?;
171 Some(SecretMeta {
172 id: SecretId::new(namespace.clone(), name.clone())
173 .with_version(SecretVersion::Exact(latest)),
174 provider: self.provider.clone(),
175 })
176 })
177 .collect())
178 }
179
180 fn namespace(&self) -> &str {
181 &self.provider
182 }
183
184 fn supports_versions(&self) -> bool {
185 true
186 }
187}
188
189fn insert_secret(
190 inner: &mut BTreeMap<(String, String), VersionedSecret>,
191 id: SecretId,
192 value: Vec<u8>,
193) {
194 let secret = inner.entry((id.namespace, id.name)).or_default();
195 let version = match id.version {
196 SecretVersion::Latest => secret.latest.unwrap_or(0) + 1,
197 SecretVersion::Exact(version) => version,
198 };
199 secret.versions.insert(version, value);
200 secret.latest = Some(secret.latest.map_or(version, |latest| latest.max(version)));
201}
202
203pub fn scoped_secret_id(
204 namespace: impl Into<String>,
205 tenant_id: impl AsRef<str>,
206 binding_id: impl AsRef<str>,
207 name: impl AsRef<str>,
208) -> SecretId {
209 SecretId::new(
210 namespace,
211 format!(
212 "tenants/{}/bindings/{}/{}",
213 tenant_id.as_ref(),
214 binding_id.as_ref(),
215 name.as_ref()
216 ),
217 )
218}
219
220#[derive(Clone)]
221pub struct ConnectorTestkit {
222 pub clock: Arc<MockClock>,
223 pub event_log: Arc<AnyEventLog>,
224 pub inbox: Arc<InboxIndex>,
225 pub metrics: Arc<MetricsRegistry>,
226 pub rate_limiter: Arc<RateLimiterFactory>,
227 pub secrets: Arc<MemorySecretProvider>,
228}
229
230impl ConnectorTestkit {
231 pub async fn new(start: OffsetDateTime) -> Self {
232 Self::with_secrets(start, MemorySecretProvider::empty()).await
233 }
234
235 pub async fn with_secrets(start: OffsetDateTime, secrets: MemorySecretProvider) -> Self {
236 let clock = MockClock::new(start);
237 let event_log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(64)));
238 let metrics = Arc::new(MetricsRegistry::default());
239 let inbox = Arc::new(
240 InboxIndex::new(event_log.clone(), metrics.clone())
241 .await
242 .expect("connector testkit inbox should initialize"),
243 );
244 Self {
245 clock,
246 event_log,
247 inbox,
248 metrics,
249 rate_limiter: Arc::new(RateLimiterFactory::default()),
250 secrets: Arc::new(secrets),
251 }
252 }
253
254 pub fn ctx(&self) -> ConnectorCtx {
255 ConnectorCtx {
256 event_log: self.event_log.clone(),
257 secrets: self.secrets.clone(),
258 inbox: self.inbox.clone(),
259 metrics: self.metrics.clone(),
260 rate_limiter: self.rate_limiter.clone(),
261 }
262 }
263
264 pub fn install_clock(&self) -> ClockOverrideGuard {
265 install_clock_override(self.clock.clone())
266 }
267}
268
269#[derive(Debug)]
270pub struct TempPackageWorkspace {
271 root: PathBuf,
272}
273
274impl TempPackageWorkspace {
275 pub fn new(prefix: impl AsRef<str>) -> io::Result<Self> {
276 let root = std::env::temp_dir().join(format!(
277 "{}-{}",
278 prefix.as_ref().trim_matches('-'),
279 Uuid::new_v4()
280 ));
281 fs::create_dir_all(&root)?;
282 Ok(Self { root })
283 }
284
285 pub fn path(&self) -> &Path {
286 &self.root
287 }
288
289 pub fn write_file(
290 &self,
291 relative: impl AsRef<Path>,
292 contents: impl AsRef<[u8]>,
293 ) -> io::Result<PathBuf> {
294 let path = self.root.join(relative.as_ref());
295 if let Some(parent) = path.parent() {
296 fs::create_dir_all(parent)?;
297 }
298 fs::write(&path, contents)?;
299 Ok(path)
300 }
301
302 pub fn write_harn_package(&self, name: &str) -> io::Result<PathBuf> {
303 self.write_file(
304 "Harn.toml",
305 format!("[package]\nname = \"{}\"\nversion = \"0.0.0-test\"\n", name),
306 )
307 }
308
309 pub fn write_cargo_package(&self, name: &str) -> io::Result<PathBuf> {
310 self.write_file(
311 "Cargo.toml",
312 format!(
313 "[package]\nname = \"{}\"\nversion = \"0.0.0\"\nedition = \"2021\"\n",
314 name
315 ),
316 )
317 }
318
319 pub fn write_npm_package(&self, name: &str) -> io::Result<PathBuf> {
320 self.write_file(
321 "package.json",
322 format!("{{\"name\":\"{}\",\"version\":\"0.0.0-test\"}}\n", name),
323 )
324 }
325}
326
327impl Drop for TempPackageWorkspace {
328 fn drop(&mut self) {
329 let _ = fs::remove_dir_all(&self.root);
330 }
331}
332
333pub struct HttpMockGuard;
334
335impl HttpMockGuard {
336 pub fn new() -> Self {
337 crate::http::reset_http_state();
338 Self
339 }
340
341 pub fn push(
342 &self,
343 method: impl Into<String>,
344 url_pattern: impl Into<String>,
345 responses: Vec<HttpMockResponse>,
346 ) {
347 crate::http::push_http_mock(method, url_pattern, responses);
348 }
349
350 pub fn calls(&self) -> Vec<HttpMockCallSnapshot> {
351 crate::http::http_mock_calls_snapshot()
352 }
353}
354
355impl Default for HttpMockGuard {
356 fn default() -> Self {
357 Self::new()
358 }
359}
360
361impl Drop for HttpMockGuard {
362 fn drop(&mut self) {
363 crate::http::reset_http_state();
364 }
365}
366
367#[derive(Clone, Debug, PartialEq)]
368pub enum MockStreamEvent {
369 Json(JsonValue),
370 Bytes(Vec<u8>),
371 Cancelled,
372}
373
374#[derive(Clone, Debug)]
375pub struct MockStreamHandle {
376 tx: mpsc::UnboundedSender<MockStreamEvent>,
377 cancelled: Arc<AtomicBool>,
378}
379
380#[derive(Debug)]
381pub struct MockStreamReader {
382 rx: mpsc::UnboundedReceiver<MockStreamEvent>,
383 cancelled: Arc<AtomicBool>,
384}
385
386pub fn mock_stream() -> (MockStreamHandle, MockStreamReader) {
387 let (tx, rx) = mpsc::unbounded_channel();
388 let cancelled = Arc::new(AtomicBool::new(false));
389 (
390 MockStreamHandle {
391 tx,
392 cancelled: cancelled.clone(),
393 },
394 MockStreamReader { rx, cancelled },
395 )
396}
397
398impl MockStreamHandle {
399 pub fn send_json(
400 &self,
401 value: JsonValue,
402 ) -> Result<(), mpsc::error::SendError<MockStreamEvent>> {
403 self.tx.send(MockStreamEvent::Json(value))
404 }
405
406 pub fn send_bytes(
407 &self,
408 value: impl Into<Vec<u8>>,
409 ) -> Result<(), mpsc::error::SendError<MockStreamEvent>> {
410 self.tx.send(MockStreamEvent::Bytes(value.into()))
411 }
412
413 pub fn cancel(&self) {
414 self.cancelled.store(true, Ordering::SeqCst);
415 let _ = self.tx.send(MockStreamEvent::Cancelled);
416 }
417
418 pub fn is_cancelled(&self) -> bool {
419 self.cancelled.load(Ordering::SeqCst)
420 }
421}
422
423impl MockStreamReader {
424 pub async fn next(&mut self) -> Option<MockStreamEvent> {
425 self.rx.recv().await
426 }
427
428 pub fn is_cancelled(&self) -> bool {
429 self.cancelled.load(Ordering::SeqCst)
430 }
431}
432
433#[derive(Clone, Debug, PartialEq)]
434pub struct WebhookFixture {
435 pub raw: RawInbound,
436 pub body: Vec<u8>,
437}
438
439impl WebhookFixture {
440 pub fn with_binding(mut self, binding: &TriggerBinding) -> Self {
441 self.raw.metadata = json!({
442 "binding_id": binding.binding_id,
443 "binding_version": 1,
444 });
445 self
446 }
447
448 pub fn with_tenant(mut self, tenant_id: impl Into<String>) -> Self {
449 self.raw.tenant_id = Some(TenantId(tenant_id.into()));
450 self
451 }
452}
453
454pub fn github_ping_fixture(secret: &str, received_at: OffsetDateTime) -> WebhookFixture {
455 let body = br#"{"zen":"Keep it logically awesome.","hook_id":42}"#.to_vec();
456 let mut raw = RawInbound::new(
457 "webhook",
458 BTreeMap::from([
459 ("content-type".to_string(), "application/json".to_string()),
460 ("x-github-event".to_string(), "ping".to_string()),
461 ("x-github-delivery".to_string(), "delivery-1".to_string()),
462 (
463 "x-hub-signature-256".to_string(),
464 format!("sha256={}", hmac_sha256_hex(secret.as_bytes(), &body)),
465 ),
466 ]),
467 body.clone(),
468 );
469 raw.received_at = received_at;
470 WebhookFixture { raw, body }
471}
472
473pub fn slack_message_fixture(
474 secret: &str,
475 timestamp: i64,
476 received_at: OffsetDateTime,
477) -> WebhookFixture {
478 let body = br#"{"type":"event_callback","event_id":"Ev1","team_id":"T1","event":{"type":"message","channel_type":"channel","channel":"C1","user":"U1","text":"hello","event_ts":"1710000000.000100"}}"#.to_vec();
479 let signed = format!("v0:{timestamp}:{}", String::from_utf8_lossy(&body));
480 let mut raw = RawInbound::new(
481 "webhook",
482 BTreeMap::from([
483 ("content-type".to_string(), "application/json".to_string()),
484 (
485 "x-slack-request-timestamp".to_string(),
486 timestamp.to_string(),
487 ),
488 (
489 "x-slack-signature".to_string(),
490 format!(
491 "v0={}",
492 hmac_sha256_hex(secret.as_bytes(), signed.as_bytes())
493 ),
494 ),
495 ]),
496 body.clone(),
497 );
498 raw.received_at = received_at;
499 WebhookFixture { raw, body }
500}
501
502pub fn linear_issue_update_fixture(secret: &str, received_at: OffsetDateTime) -> WebhookFixture {
503 let body = br#"{"type":"Issue","action":"update","createdAt":"2026-04-19T00:00:00Z","data":{"id":"issue-1","identifier":"ENG-1","title":"connector"},"updatedFrom":{"title":"old"}}"#.to_vec();
504 let mut raw = RawInbound::new(
505 "webhook",
506 BTreeMap::from([
507 ("content-type".to_string(), "application/json".to_string()),
508 (
509 "linear-signature".to_string(),
510 hmac_sha256_hex(secret.as_bytes(), &body),
511 ),
512 ]),
513 body.clone(),
514 );
515 raw.received_at = received_at;
516 WebhookFixture { raw, body }
517}
518
519pub fn webhook_binding(
520 provider: impl Into<String>,
521 binding_id: impl Into<String>,
522 signing_secret: Option<SecretId>,
523) -> TriggerBinding {
524 let provider = provider.into();
525 let mut binding =
526 TriggerBinding::new(ProviderId::from(provider.clone()), "webhook", binding_id);
527 let mut secrets = serde_json::Map::new();
528 if let Some(secret) = signing_secret {
529 secrets.insert(
530 "signing_secret".to_string(),
531 JsonValue::String(secret.to_string()),
532 );
533 }
534 binding.config = json!({
535 "path": format!("/hooks/{provider}"),
536 "match": {"events": ["*"]},
537 "secrets": secrets,
538 });
539 binding
540}
541
542fn hmac_sha256_hex(secret: &[u8], data: &[u8]) -> String {
543 const BLOCK_SIZE: usize = 64;
544 let mut key = if secret.len() > BLOCK_SIZE {
545 Sha256::digest(secret).to_vec()
546 } else {
547 secret.to_vec()
548 };
549 key.resize(BLOCK_SIZE, 0);
550 let mut outer = vec![0x5c; BLOCK_SIZE];
551 let mut inner = vec![0x36; BLOCK_SIZE];
552 for i in 0..BLOCK_SIZE {
553 outer[i] ^= key[i];
554 inner[i] ^= key[i];
555 }
556 let mut inner_hash = Sha256::new();
557 inner_hash.update(&inner);
558 inner_hash.update(data);
559 let inner_result = inner_hash.finalize();
560 let mut outer_hash = Sha256::new();
561 outer_hash.update(&outer);
562 outer_hash.update(inner_result);
563 hex::encode(outer_hash.finalize())
564}
565
566pub async fn advance_until<F>(
567 clock: &MockClock,
568 timeout: StdDuration,
569 tick: StdDuration,
570 mut predicate: F,
571) -> bool
572where
573 F: FnMut() -> bool,
574{
575 let mut elapsed = StdDuration::ZERO;
576 while elapsed <= timeout {
577 if predicate() {
578 return true;
579 }
580 clock.advance_std(tick).await;
581 elapsed += tick;
582 }
583 predicate()
584}
585
586#[cfg(test)]
587mod tests {
588 use super::*;
589 use crate::secrets::SecretProvider;
590
591 fn parse_ts(value: &str) -> OffsetDateTime {
592 OffsetDateTime::parse(value, &time::format_description::well_known::Rfc3339).unwrap()
593 }
594
595 #[tokio::test]
596 async fn memory_secret_provider_scopes_and_versions_secrets() {
597 let mut provider = MemorySecretProvider::new("test");
598 let scoped = provider.insert_scoped("github", "tenant-a", "binding-a", "token", "v1");
599 provider
600 .put(&scoped, SecretBytes::from("v2"))
601 .await
602 .expect("put latest");
603
604 let latest = provider.get(&scoped).await.expect("latest");
605 assert_eq!(latest.with_exposed(|bytes| bytes.to_vec()), b"v2".to_vec());
606 let first = provider
607 .get(&scoped.clone().with_version(SecretVersion::Exact(1)))
608 .await
609 .expect("v1");
610 assert_eq!(first.with_exposed(|bytes| bytes.to_vec()), b"v1".to_vec());
611 assert!(provider
612 .get(&scoped_secret_id(
613 "github",
614 "tenant-b",
615 "binding-a",
616 "token"
617 ))
618 .await
619 .is_err());
620 }
621
622 #[tokio::test]
623 async fn connector_testkit_controls_clock_and_deadlines() {
624 let kit = ConnectorTestkit::new(parse_ts("2026-04-19T00:00:00Z")).await;
625 let _guard = kit.install_clock();
626 let mut fired = false;
627 assert!(
628 !advance_until(
629 &kit.clock,
630 StdDuration::from_millis(20),
631 StdDuration::from_millis(10),
632 || fired,
633 )
634 .await
635 );
636 fired = true;
637 assert!(
638 advance_until(
639 &kit.clock,
640 StdDuration::from_millis(20),
641 StdDuration::from_millis(10),
642 || fired,
643 )
644 .await
645 );
646 assert_eq!(instant_now().as_millis(), 30);
647 }
648
649 #[tokio::test]
650 async fn mock_stream_cancels_reader_without_wall_clock_sleep() {
651 let (handle, mut reader) = mock_stream();
652 handle.send_json(json!({"event": "one"})).expect("send");
653 assert_eq!(
654 reader.next().await,
655 Some(MockStreamEvent::Json(json!({"event": "one"})))
656 );
657 handle.cancel();
658 assert_eq!(reader.next().await, Some(MockStreamEvent::Cancelled));
659 assert!(reader.is_cancelled());
660 }
661
662 #[test]
663 fn temp_workspace_writes_package_markers() {
664 let workspace = TempPackageWorkspace::new("harn-testkit").expect("workspace");
665 workspace.write_harn_package("demo").expect("harn package");
666 workspace
667 .write_cargo_package("demo")
668 .expect("cargo package");
669 workspace.write_npm_package("demo").expect("npm package");
670 assert!(workspace.path().join("Harn.toml").exists());
671 assert!(workspace.path().join("Cargo.toml").exists());
672 assert!(workspace.path().join("package.json").exists());
673 }
674
675 #[test]
676 fn webhook_fixtures_include_provider_signatures() {
677 let received_at = parse_ts("2026-04-19T00:00:00Z");
678 let github = github_ping_fixture("topsecret", received_at);
679 assert!(github.raw.headers["x-hub-signature-256"].starts_with("sha256="));
680 let slack = slack_message_fixture("topsecret", received_at.unix_timestamp(), received_at);
681 assert!(slack.raw.headers["x-slack-signature"].starts_with("v0="));
682 let linear = linear_issue_update_fixture("topsecret", received_at);
683 assert_eq!(linear.raw.headers["linear-signature"].len(), 64);
684 }
685}