Skip to main content

harn_vm/connectors/
testkit.rs

1use std::collections::BTreeMap;
2use std::fs;
3use std::io;
4use std::path::{Path, PathBuf};
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, Mutex};
7use std::time::Duration as StdDuration;
8
9use async_trait::async_trait;
10use serde_json::{json, Value as JsonValue};
11use time::OffsetDateTime;
12use tokio::sync::mpsc;
13use uuid::Uuid;
14
15pub use crate::http::framing::{
16    http_content_length_from_header_lines, HttpContentLengthLimitError, TEST_HTTP_MAX_BODY_BYTES,
17};
18pub use crate::http::{HttpMockCallSnapshot, HttpMockResponse};
19pub use crate::triggers::test_util::clock::{
20    active_mock_clock, install_override as install_clock_override, instant_now, now_ms, now_utc,
21    ClockInstant, ClockOverrideGuard, MockClock,
22};
23
24use crate::connectors::{
25    ConnectorCtx, MetricsRegistry, RateLimiterFactory, RawInbound, TriggerBinding,
26};
27use crate::event_log::{AnyEventLog, MemoryEventLog};
28use crate::secrets::{
29    RotationHandle, SecretBytes, SecretError, SecretId, SecretMeta, SecretProvider, SecretVersion,
30};
31use crate::triggers::{InboxIndex, ProviderId, TenantId};
32
33#[derive(Clone, Debug)]
34pub struct MemorySecretProvider {
35    provider: String,
36    inner: Arc<Mutex<BTreeMap<(String, String), VersionedSecret>>>,
37}
38
39#[derive(Clone, Debug, Default)]
40struct VersionedSecret {
41    latest: Option<u64>,
42    versions: BTreeMap<u64, Vec<u8>>,
43}
44
45impl MemorySecretProvider {
46    pub fn new(provider: impl Into<String>) -> Self {
47        Self {
48            provider: provider.into(),
49            inner: Arc::new(Mutex::new(BTreeMap::new())),
50        }
51    }
52
53    pub fn empty() -> Self {
54        Self::new("connector-testkit")
55    }
56
57    pub fn with_secret(mut self, id: SecretId, value: impl AsRef<[u8]>) -> Self {
58        self.insert(id, value);
59        self
60    }
61
62    pub fn with_scoped_secret(
63        self,
64        namespace: impl Into<String>,
65        tenant_id: impl AsRef<str>,
66        binding_id: impl AsRef<str>,
67        name: impl AsRef<str>,
68        value: impl AsRef<[u8]>,
69    ) -> Self {
70        let id = scoped_secret_id(namespace, tenant_id, binding_id, name);
71        self.with_secret(id, value)
72    }
73
74    pub fn insert(&mut self, id: SecretId, value: impl AsRef<[u8]>) {
75        let mut inner = self.inner.lock().expect("memory secret provider poisoned");
76        insert_secret(&mut inner, id, value.as_ref().to_vec());
77    }
78
79    pub fn insert_scoped(
80        &mut self,
81        namespace: impl Into<String>,
82        tenant_id: impl AsRef<str>,
83        binding_id: impl AsRef<str>,
84        name: impl AsRef<str>,
85        value: impl AsRef<[u8]>,
86    ) -> SecretId {
87        let id = scoped_secret_id(namespace, tenant_id, binding_id, name);
88        self.insert(id.clone(), value);
89        id
90    }
91
92    pub fn snapshot(&self) -> Vec<SecretMeta> {
93        let inner = self.inner.lock().expect("memory secret provider poisoned");
94        inner
95            .iter()
96            .filter_map(|((namespace, name), secret)| {
97                let latest = secret.latest?;
98                Some(SecretMeta {
99                    id: SecretId::new(namespace.clone(), name.clone())
100                        .with_version(SecretVersion::Exact(latest)),
101                    provider: self.provider.clone(),
102                })
103            })
104            .collect()
105    }
106}
107
108#[async_trait]
109impl SecretProvider for MemorySecretProvider {
110    async fn get(&self, id: &SecretId) -> Result<SecretBytes, SecretError> {
111        let inner = self.inner.lock().expect("memory secret provider poisoned");
112        let secret = inner
113            .get(&(id.namespace.clone(), id.name.clone()))
114            .ok_or_else(|| SecretError::NotFound {
115                provider: self.provider.clone(),
116                id: id.clone(),
117            })?;
118        let version = match id.version {
119            SecretVersion::Latest => secret.latest,
120            SecretVersion::Exact(version) => Some(version),
121        }
122        .ok_or_else(|| SecretError::NotFound {
123            provider: self.provider.clone(),
124            id: id.clone(),
125        })?;
126        secret
127            .versions
128            .get(&version)
129            .cloned()
130            .map(SecretBytes::from)
131            .ok_or_else(|| SecretError::NotFound {
132                provider: self.provider.clone(),
133                id: id.clone(),
134            })
135    }
136
137    async fn put(&self, id: &SecretId, value: SecretBytes) -> Result<(), SecretError> {
138        let mut inner = self.inner.lock().expect("memory secret provider poisoned");
139        let value = value.with_exposed(|bytes| bytes.to_vec());
140        insert_secret(&mut inner, id.clone(), value);
141        Ok(())
142    }
143
144    async fn rotate(&self, id: &SecretId) -> Result<RotationHandle, SecretError> {
145        let mut inner = self.inner.lock().expect("memory secret provider poisoned");
146        let key = (id.namespace.clone(), id.name.clone());
147        let secret = inner.entry(key).or_default();
148        let from_version = secret.latest;
149        let to_version = from_version.unwrap_or(0) + 1;
150        let value = from_version
151            .and_then(|version| secret.versions.get(&version).cloned())
152            .unwrap_or_default();
153        secret.versions.insert(to_version, value);
154        secret.latest = Some(to_version);
155        Ok(RotationHandle {
156            provider: self.provider.clone(),
157            id: SecretId::new(id.namespace.clone(), id.name.clone())
158                .with_version(SecretVersion::Exact(to_version)),
159            from_version,
160            to_version: Some(to_version),
161        })
162    }
163
164    async fn list(&self, prefix: &SecretId) -> Result<Vec<SecretMeta>, SecretError> {
165        let inner = self.inner.lock().expect("memory secret provider poisoned");
166        Ok(inner
167            .iter()
168            .filter(|((namespace, name), _)| {
169                namespace == &prefix.namespace && name.starts_with(&prefix.name)
170            })
171            .filter_map(|((namespace, name), secret)| {
172                let latest = secret.latest?;
173                Some(SecretMeta {
174                    id: SecretId::new(namespace.clone(), name.clone())
175                        .with_version(SecretVersion::Exact(latest)),
176                    provider: self.provider.clone(),
177                })
178            })
179            .collect())
180    }
181
182    fn namespace(&self) -> &str {
183        &self.provider
184    }
185
186    fn supports_versions(&self) -> bool {
187        true
188    }
189}
190
191fn insert_secret(
192    inner: &mut BTreeMap<(String, String), VersionedSecret>,
193    id: SecretId,
194    value: Vec<u8>,
195) {
196    let secret = inner.entry((id.namespace, id.name)).or_default();
197    let version = match id.version {
198        SecretVersion::Latest => secret.latest.unwrap_or(0) + 1,
199        SecretVersion::Exact(version) => version,
200    };
201    secret.versions.insert(version, value);
202    secret.latest = Some(secret.latest.map_or(version, |latest| latest.max(version)));
203}
204
205pub fn scoped_secret_id(
206    namespace: impl Into<String>,
207    tenant_id: impl AsRef<str>,
208    binding_id: impl AsRef<str>,
209    name: impl AsRef<str>,
210) -> SecretId {
211    SecretId::new(
212        namespace,
213        format!(
214            "tenants/{}/bindings/{}/{}",
215            tenant_id.as_ref(),
216            binding_id.as_ref(),
217            name.as_ref()
218        ),
219    )
220}
221
222#[derive(Clone)]
223pub struct ConnectorTestkit {
224    pub clock: Arc<MockClock>,
225    pub event_log: Arc<AnyEventLog>,
226    pub inbox: Arc<InboxIndex>,
227    pub metrics: Arc<MetricsRegistry>,
228    pub rate_limiter: Arc<RateLimiterFactory>,
229    pub secrets: Arc<MemorySecretProvider>,
230}
231
232impl ConnectorTestkit {
233    pub async fn new(start: OffsetDateTime) -> Self {
234        Self::with_secrets(start, MemorySecretProvider::empty()).await
235    }
236
237    pub async fn with_secrets(start: OffsetDateTime, secrets: MemorySecretProvider) -> Self {
238        let clock = MockClock::new(start);
239        let event_log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(64)));
240        let metrics = Arc::new(MetricsRegistry::default());
241        let inbox = Arc::new(
242            InboxIndex::new(event_log.clone(), metrics.clone())
243                .await
244                .expect("connector testkit inbox should initialize"),
245        );
246        Self {
247            clock,
248            event_log,
249            inbox,
250            metrics,
251            rate_limiter: Arc::new(RateLimiterFactory::default()),
252            secrets: Arc::new(secrets),
253        }
254    }
255
256    pub fn ctx(&self) -> ConnectorCtx {
257        ConnectorCtx {
258            event_log: self.event_log.clone(),
259            secrets: self.secrets.clone(),
260            inbox: self.inbox.clone(),
261            metrics: self.metrics.clone(),
262            rate_limiter: self.rate_limiter.clone(),
263        }
264    }
265
266    pub fn install_clock(&self) -> ClockOverrideGuard {
267        install_clock_override(self.clock.clone())
268    }
269}
270
271#[derive(Debug)]
272pub struct TempPackageWorkspace {
273    root: PathBuf,
274}
275
276impl TempPackageWorkspace {
277    pub fn new(prefix: impl AsRef<str>) -> io::Result<Self> {
278        let root = std::env::temp_dir().join(format!(
279            "{}-{}",
280            prefix.as_ref().trim_matches('-'),
281            Uuid::new_v4()
282        ));
283        fs::create_dir_all(&root)?;
284        Ok(Self { root })
285    }
286
287    pub fn path(&self) -> &Path {
288        &self.root
289    }
290
291    pub fn write_file(
292        &self,
293        relative: impl AsRef<Path>,
294        contents: impl AsRef<[u8]>,
295    ) -> io::Result<PathBuf> {
296        let path = self.root.join(relative.as_ref());
297        if let Some(parent) = path.parent() {
298            fs::create_dir_all(parent)?;
299        }
300        fs::write(&path, contents)?;
301        Ok(path)
302    }
303
304    pub fn write_harn_package(&self, name: &str) -> io::Result<PathBuf> {
305        self.write_file(
306            "Harn.toml",
307            format!("[package]\nname = \"{name}\"\nversion = \"0.0.0-test\"\n"),
308        )
309    }
310
311    pub fn write_cargo_package(&self, name: &str) -> io::Result<PathBuf> {
312        self.write_file(
313            "Cargo.toml",
314            format!("[package]\nname = \"{name}\"\nversion = \"0.0.0\"\nedition = \"2021\"\n"),
315        )
316    }
317
318    pub fn write_npm_package(&self, name: &str) -> io::Result<PathBuf> {
319        self.write_file(
320            "package.json",
321            format!("{{\"name\":\"{name}\",\"version\":\"0.0.0-test\"}}\n"),
322        )
323    }
324}
325
326impl Drop for TempPackageWorkspace {
327    fn drop(&mut self) {
328        let _ = fs::remove_dir_all(&self.root);
329    }
330}
331
332pub struct HttpMockGuard;
333
334impl HttpMockGuard {
335    pub fn new() -> Self {
336        crate::http::reset_http_state();
337        Self
338    }
339
340    pub fn push(
341        &self,
342        method: impl Into<String>,
343        url_pattern: impl Into<String>,
344        responses: Vec<HttpMockResponse>,
345    ) {
346        crate::http::push_http_mock(method, url_pattern, responses);
347    }
348
349    pub fn calls(&self) -> Vec<HttpMockCallSnapshot> {
350        crate::http::http_mock_calls_snapshot()
351    }
352}
353
354impl Default for HttpMockGuard {
355    fn default() -> Self {
356        Self::new()
357    }
358}
359
360impl Drop for HttpMockGuard {
361    fn drop(&mut self) {
362        crate::http::reset_http_state();
363    }
364}
365
366#[derive(Clone, Debug, PartialEq, Eq)]
367pub enum MockStreamEvent {
368    Json(JsonValue),
369    Bytes(Vec<u8>),
370    Cancelled,
371}
372
373#[derive(Clone, Debug)]
374pub struct MockStreamHandle {
375    tx: mpsc::UnboundedSender<MockStreamEvent>,
376    cancelled: Arc<AtomicBool>,
377}
378
379#[derive(Debug)]
380pub struct MockStreamReader {
381    rx: mpsc::UnboundedReceiver<MockStreamEvent>,
382    cancelled: Arc<AtomicBool>,
383}
384
385pub fn mock_stream() -> (MockStreamHandle, MockStreamReader) {
386    let (tx, rx) = mpsc::unbounded_channel();
387    let cancelled = Arc::new(AtomicBool::new(false));
388    (
389        MockStreamHandle {
390            tx,
391            cancelled: cancelled.clone(),
392        },
393        MockStreamReader { rx, cancelled },
394    )
395}
396
397impl MockStreamHandle {
398    pub fn send_json(
399        &self,
400        value: JsonValue,
401    ) -> Result<(), mpsc::error::SendError<MockStreamEvent>> {
402        self.tx.send(MockStreamEvent::Json(value))
403    }
404
405    pub fn send_bytes(
406        &self,
407        value: impl Into<Vec<u8>>,
408    ) -> Result<(), mpsc::error::SendError<MockStreamEvent>> {
409        self.tx.send(MockStreamEvent::Bytes(value.into()))
410    }
411
412    pub fn cancel(&self) {
413        self.cancelled.store(true, Ordering::SeqCst);
414        let _ = self.tx.send(MockStreamEvent::Cancelled);
415    }
416
417    pub fn is_cancelled(&self) -> bool {
418        self.cancelled.load(Ordering::SeqCst)
419    }
420}
421
422impl MockStreamReader {
423    pub async fn next(&mut self) -> Option<MockStreamEvent> {
424        self.rx.recv().await
425    }
426
427    pub fn is_cancelled(&self) -> bool {
428        self.cancelled.load(Ordering::SeqCst)
429    }
430}
431
432// `RawInbound.metadata` is `serde_json::Value`, which isn't `Eq`.
433#[allow(clippy::derive_partial_eq_without_eq)]
434#[derive(Clone, Debug, PartialEq)]
435pub struct WebhookFixture {
436    pub raw: RawInbound,
437    pub body: Vec<u8>,
438}
439
440impl WebhookFixture {
441    pub fn with_binding(mut self, binding: &TriggerBinding) -> Self {
442        self.raw.metadata = json!({
443            "binding_id": binding.binding_id,
444            "binding_version": 1,
445        });
446        self
447    }
448
449    pub fn with_tenant(mut self, tenant_id: impl Into<String>) -> Self {
450        self.raw.tenant_id = Some(TenantId(tenant_id.into()));
451        self
452    }
453}
454
455pub fn github_ping_fixture(secret: &str, received_at: OffsetDateTime) -> WebhookFixture {
456    let body = br#"{"zen":"Keep it logically awesome.","hook_id":42}"#.to_vec();
457    let mut raw = RawInbound::new(
458        "webhook",
459        BTreeMap::from([
460            ("content-type".to_string(), "application/json".to_string()),
461            ("x-github-event".to_string(), "ping".to_string()),
462            ("x-github-delivery".to_string(), "delivery-1".to_string()),
463            (
464                "x-hub-signature-256".to_string(),
465                format!("sha256={}", hmac_sha256_hex(secret.as_bytes(), &body)),
466            ),
467        ]),
468        body.clone(),
469    );
470    raw.received_at = received_at;
471    WebhookFixture { raw, body }
472}
473
474pub fn slack_message_fixture(
475    secret: &str,
476    timestamp: i64,
477    received_at: OffsetDateTime,
478) -> WebhookFixture {
479    let body = br#"{"type":"event_callback","event_id":"Ev1","team_id":"T1","event":{"type":"message","channel_type":"channel","channel":"C1","user":"U1","text":"hello","event_ts":"1710000000.000100"}}"#.to_vec();
480    let signed = format!("v0:{timestamp}:{}", String::from_utf8_lossy(&body));
481    let mut raw = RawInbound::new(
482        "webhook",
483        BTreeMap::from([
484            ("content-type".to_string(), "application/json".to_string()),
485            (
486                "x-slack-request-timestamp".to_string(),
487                timestamp.to_string(),
488            ),
489            (
490                "x-slack-signature".to_string(),
491                format!(
492                    "v0={}",
493                    hmac_sha256_hex(secret.as_bytes(), signed.as_bytes())
494                ),
495            ),
496        ]),
497        body.clone(),
498    );
499    raw.received_at = received_at;
500    WebhookFixture { raw, body }
501}
502
503pub fn linear_issue_update_fixture(secret: &str, received_at: OffsetDateTime) -> WebhookFixture {
504    let body = br#"{"type":"Issue","action":"update","createdAt":"2026-04-19T00:00:00Z","data":{"id":"issue-1","identifier":"ENG-1","title":"connector"},"updatedFrom":{"title":"old"}}"#.to_vec();
505    let mut raw = RawInbound::new(
506        "webhook",
507        BTreeMap::from([
508            ("content-type".to_string(), "application/json".to_string()),
509            (
510                "linear-signature".to_string(),
511                hmac_sha256_hex(secret.as_bytes(), &body),
512            ),
513        ]),
514        body.clone(),
515    );
516    raw.received_at = received_at;
517    WebhookFixture { raw, body }
518}
519
520pub fn notion_page_content_updated_fixture(
521    secret: &str,
522    received_at: OffsetDateTime,
523) -> WebhookFixture {
524    let body = br#"{"id":"evt_1","type":"page.content_updated","workspace_id":"ws_1","subscription_id":"sub_1","integration_id":"int_1","entity":{"id":"page_1","type":"page"},"api_version":"2022-06-28"}"#.to_vec();
525    let mut raw = RawInbound::new(
526        "webhook",
527        BTreeMap::from([
528            ("content-type".to_string(), "application/json".to_string()),
529            (
530                "x-notion-signature".to_string(),
531                format!("sha256={}", hmac_sha256_hex(secret.as_bytes(), &body)),
532            ),
533            ("request-id".to_string(), "req_1".to_string()),
534        ]),
535        body.clone(),
536    );
537    raw.received_at = received_at;
538    WebhookFixture { raw, body }
539}
540
541pub fn webhook_binding(
542    provider: impl Into<String>,
543    binding_id: impl Into<String>,
544    signing_secret: Option<SecretId>,
545) -> TriggerBinding {
546    let provider = provider.into();
547    let mut binding =
548        TriggerBinding::new(ProviderId::from(provider.clone()), "webhook", binding_id);
549    let mut secrets = serde_json::Map::new();
550    if let Some(secret) = signing_secret {
551        secrets.insert(
552            "signing_secret".to_string(),
553            JsonValue::String(secret.to_string()),
554        );
555    }
556    binding.config = json!({
557        "path": format!("/hooks/{provider}"),
558        "match": {"events": ["*"]},
559        "secrets": secrets,
560    });
561    binding
562}
563
564fn hmac_sha256_hex(secret: &[u8], data: &[u8]) -> String {
565    hex::encode(crate::connectors::hmac::hmac_sha256(secret, data))
566}
567
568pub async fn advance_until<F>(
569    clock: &MockClock,
570    timeout: StdDuration,
571    tick: StdDuration,
572    mut predicate: F,
573) -> bool
574where
575    F: FnMut() -> bool,
576{
577    let mut elapsed = StdDuration::ZERO;
578    while elapsed <= timeout {
579        if predicate() {
580            return true;
581        }
582        clock.advance_std(tick).await;
583        elapsed += tick;
584    }
585    predicate()
586}
587
588#[cfg(test)]
589mod tests {
590    use super::*;
591    use crate::secrets::SecretProvider;
592
593    fn parse_ts(value: &str) -> OffsetDateTime {
594        OffsetDateTime::parse(value, &time::format_description::well_known::Rfc3339).unwrap()
595    }
596
597    #[tokio::test]
598    async fn memory_secret_provider_scopes_and_versions_secrets() {
599        let mut provider = MemorySecretProvider::new("test");
600        let scoped = provider.insert_scoped("github", "tenant-a", "binding-a", "token", "v1");
601        provider
602            .put(&scoped, SecretBytes::from("v2"))
603            .await
604            .expect("put latest");
605
606        let latest = provider.get(&scoped).await.expect("latest");
607        assert_eq!(latest.with_exposed(|bytes| bytes.to_vec()), b"v2".to_vec());
608        let first = provider
609            .get(&scoped.clone().with_version(SecretVersion::Exact(1)))
610            .await
611            .expect("v1");
612        assert_eq!(first.with_exposed(|bytes| bytes.to_vec()), b"v1".to_vec());
613        assert!(provider
614            .get(&scoped_secret_id(
615                "github",
616                "tenant-b",
617                "binding-a",
618                "token"
619            ))
620            .await
621            .is_err());
622    }
623
624    #[tokio::test]
625    async fn connector_testkit_controls_clock_and_deadlines() {
626        let kit = ConnectorTestkit::new(parse_ts("2026-04-19T00:00:00Z")).await;
627        let _guard = kit.install_clock();
628        let mut fired = false;
629        assert!(
630            !advance_until(
631                &kit.clock,
632                StdDuration::from_millis(20),
633                StdDuration::from_millis(10),
634                || fired,
635            )
636            .await
637        );
638        fired = true;
639        assert!(
640            advance_until(
641                &kit.clock,
642                StdDuration::from_millis(20),
643                StdDuration::from_millis(10),
644                || fired,
645            )
646            .await
647        );
648        assert_eq!(instant_now().as_millis(), 30);
649    }
650
651    #[tokio::test]
652    async fn mock_stream_cancels_reader_without_wall_clock_sleep() {
653        let (handle, mut reader) = mock_stream();
654        handle.send_json(json!({"event": "one"})).expect("send");
655        assert_eq!(
656            reader.next().await,
657            Some(MockStreamEvent::Json(json!({"event": "one"})))
658        );
659        handle.cancel();
660        assert_eq!(reader.next().await, Some(MockStreamEvent::Cancelled));
661        assert!(reader.is_cancelled());
662    }
663
664    #[test]
665    fn temp_workspace_writes_package_markers() {
666        let workspace = TempPackageWorkspace::new("harn-testkit").expect("workspace");
667        workspace.write_harn_package("demo").expect("harn package");
668        workspace
669            .write_cargo_package("demo")
670            .expect("cargo package");
671        workspace.write_npm_package("demo").expect("npm package");
672        assert!(workspace.path().join("Harn.toml").exists());
673        assert!(workspace.path().join("Cargo.toml").exists());
674        assert!(workspace.path().join("package.json").exists());
675    }
676
677    #[test]
678    fn webhook_fixtures_include_provider_signatures() {
679        let received_at = parse_ts("2026-04-19T00:00:00Z");
680        let github = github_ping_fixture("topsecret", received_at);
681        assert!(github.raw.headers["x-hub-signature-256"].starts_with("sha256="));
682        let slack = slack_message_fixture("topsecret", received_at.unix_timestamp(), received_at);
683        assert!(slack.raw.headers["x-slack-signature"].starts_with("v0="));
684        let linear = linear_issue_update_fixture("topsecret", received_at);
685        assert_eq!(linear.raw.headers["linear-signature"].len(), 64);
686        let notion = notion_page_content_updated_fixture("topsecret", received_at);
687        assert!(notion.raw.headers["x-notion-signature"].starts_with("sha256="));
688    }
689}