Skip to main content

harn_vm/connectors/
testkit.rs

1use std::collections::BTreeMap;
2use std::fs;
3use std::io;
4use std::path::{Path, PathBuf};
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, Mutex};
7use std::time::Duration as StdDuration;
8
9use async_trait::async_trait;
10use serde_json::{json, Value as JsonValue};
11use time::OffsetDateTime;
12use tokio::sync::mpsc;
13use uuid::Uuid;
14
15pub use crate::http::framing::{
16    http_content_length_from_header_lines, HttpContentLengthLimitError, TEST_HTTP_MAX_BODY_BYTES,
17};
18pub use crate::http::{HttpMockCallSnapshot, HttpMockResponse};
19pub use crate::triggers::test_util::clock::{
20    active_mock_clock, install_override as install_clock_override, instant_now, now_ms, now_utc,
21    ClockInstant, ClockOverrideGuard, MockClock,
22};
23
24use crate::connectors::{
25    ConnectorCtx, MetricsRegistry, RateLimiterFactory, RawInbound, TriggerBinding,
26};
27use crate::event_log::{AnyEventLog, MemoryEventLog};
28use crate::secrets::{
29    RotationHandle, SecretBytes, SecretError, SecretId, SecretMeta, SecretProvider, SecretVersion,
30};
31use crate::triggers::{InboxIndex, ProviderId, TenantId};
32
33#[derive(Clone, Debug)]
34pub struct MemorySecretProvider {
35    provider: String,
36    inner: Arc<Mutex<BTreeMap<(String, String), VersionedSecret>>>,
37}
38
39#[derive(Clone, Debug, Default)]
40struct VersionedSecret {
41    latest: Option<u64>,
42    versions: BTreeMap<u64, Vec<u8>>,
43}
44
45impl MemorySecretProvider {
46    pub fn new(provider: impl Into<String>) -> Self {
47        Self {
48            provider: provider.into(),
49            inner: Arc::new(Mutex::new(BTreeMap::new())),
50        }
51    }
52
53    pub fn empty() -> Self {
54        Self::new("connector-testkit")
55    }
56
57    pub fn with_secret(mut self, id: SecretId, value: impl AsRef<[u8]>) -> Self {
58        self.insert(id, value);
59        self
60    }
61
62    pub fn with_scoped_secret(
63        self,
64        namespace: impl Into<String>,
65        tenant_id: impl AsRef<str>,
66        binding_id: impl AsRef<str>,
67        name: impl AsRef<str>,
68        value: impl AsRef<[u8]>,
69    ) -> Self {
70        let id = scoped_secret_id(namespace, tenant_id, binding_id, name);
71        self.with_secret(id, value)
72    }
73
74    pub fn insert(&mut self, id: SecretId, value: impl AsRef<[u8]>) {
75        let mut inner = self.inner.lock().expect("memory secret provider poisoned");
76        insert_secret(&mut inner, id, value.as_ref().to_vec());
77    }
78
79    pub fn insert_scoped(
80        &mut self,
81        namespace: impl Into<String>,
82        tenant_id: impl AsRef<str>,
83        binding_id: impl AsRef<str>,
84        name: impl AsRef<str>,
85        value: impl AsRef<[u8]>,
86    ) -> SecretId {
87        let id = scoped_secret_id(namespace, tenant_id, binding_id, name);
88        self.insert(id.clone(), value);
89        id
90    }
91
92    pub fn snapshot(&self) -> Vec<SecretMeta> {
93        let inner = self.inner.lock().expect("memory secret provider poisoned");
94        inner
95            .iter()
96            .filter_map(|((namespace, name), secret)| {
97                let latest = secret.latest?;
98                Some(SecretMeta {
99                    id: SecretId::new(namespace.clone(), name.clone())
100                        .with_version(SecretVersion::Exact(latest)),
101                    provider: self.provider.clone(),
102                })
103            })
104            .collect()
105    }
106}
107
108#[async_trait]
109impl SecretProvider for MemorySecretProvider {
110    async fn get(&self, id: &SecretId) -> Result<SecretBytes, SecretError> {
111        let inner = self.inner.lock().expect("memory secret provider poisoned");
112        let secret = inner
113            .get(&(id.namespace.clone(), id.name.clone()))
114            .ok_or_else(|| SecretError::NotFound {
115                provider: self.provider.clone(),
116                id: id.clone(),
117            })?;
118        let version = match id.version {
119            SecretVersion::Latest => secret.latest,
120            SecretVersion::Exact(version) => Some(version),
121        }
122        .ok_or_else(|| SecretError::NotFound {
123            provider: self.provider.clone(),
124            id: id.clone(),
125        })?;
126        secret
127            .versions
128            .get(&version)
129            .cloned()
130            .map(SecretBytes::from)
131            .ok_or_else(|| SecretError::NotFound {
132                provider: self.provider.clone(),
133                id: id.clone(),
134            })
135    }
136
137    async fn put(&self, id: &SecretId, value: SecretBytes) -> Result<(), SecretError> {
138        let mut inner = self.inner.lock().expect("memory secret provider poisoned");
139        let value = value.with_exposed(|bytes| bytes.to_vec());
140        insert_secret(&mut inner, id.clone(), value);
141        Ok(())
142    }
143
144    async fn rotate(&self, id: &SecretId) -> Result<RotationHandle, SecretError> {
145        let mut inner = self.inner.lock().expect("memory secret provider poisoned");
146        let key = (id.namespace.clone(), id.name.clone());
147        let secret = inner.entry(key).or_default();
148        let from_version = secret.latest;
149        let to_version = from_version.unwrap_or(0) + 1;
150        let value = from_version
151            .and_then(|version| secret.versions.get(&version).cloned())
152            .unwrap_or_default();
153        secret.versions.insert(to_version, value);
154        secret.latest = Some(to_version);
155        Ok(RotationHandle {
156            provider: self.provider.clone(),
157            id: SecretId::new(id.namespace.clone(), id.name.clone())
158                .with_version(SecretVersion::Exact(to_version)),
159            from_version,
160            to_version: Some(to_version),
161        })
162    }
163
164    async fn list(&self, prefix: &SecretId) -> Result<Vec<SecretMeta>, SecretError> {
165        let inner = self.inner.lock().expect("memory secret provider poisoned");
166        Ok(inner
167            .iter()
168            .filter(|((namespace, name), _)| {
169                namespace == &prefix.namespace && name.starts_with(&prefix.name)
170            })
171            .filter_map(|((namespace, name), secret)| {
172                let latest = secret.latest?;
173                Some(SecretMeta {
174                    id: SecretId::new(namespace.clone(), name.clone())
175                        .with_version(SecretVersion::Exact(latest)),
176                    provider: self.provider.clone(),
177                })
178            })
179            .collect())
180    }
181
182    fn namespace(&self) -> &str {
183        &self.provider
184    }
185
186    fn supports_versions(&self) -> bool {
187        true
188    }
189}
190
191fn insert_secret(
192    inner: &mut BTreeMap<(String, String), VersionedSecret>,
193    id: SecretId,
194    value: Vec<u8>,
195) {
196    let secret = inner.entry((id.namespace, id.name)).or_default();
197    let version = match id.version {
198        SecretVersion::Latest => secret.latest.unwrap_or(0) + 1,
199        SecretVersion::Exact(version) => version,
200    };
201    secret.versions.insert(version, value);
202    secret.latest = Some(secret.latest.map_or(version, |latest| latest.max(version)));
203}
204
205pub fn scoped_secret_id(
206    namespace: impl Into<String>,
207    tenant_id: impl AsRef<str>,
208    binding_id: impl AsRef<str>,
209    name: impl AsRef<str>,
210) -> SecretId {
211    SecretId::new(
212        namespace,
213        format!(
214            "tenants/{}/bindings/{}/{}",
215            tenant_id.as_ref(),
216            binding_id.as_ref(),
217            name.as_ref()
218        ),
219    )
220}
221
222#[derive(Clone)]
223pub struct ConnectorTestkit {
224    pub clock: Arc<MockClock>,
225    pub event_log: Arc<AnyEventLog>,
226    pub inbox: Arc<InboxIndex>,
227    pub metrics: Arc<MetricsRegistry>,
228    pub rate_limiter: Arc<RateLimiterFactory>,
229    pub secrets: Arc<MemorySecretProvider>,
230}
231
232impl ConnectorTestkit {
233    pub async fn new(start: OffsetDateTime) -> Self {
234        Self::with_secrets(start, MemorySecretProvider::empty()).await
235    }
236
237    pub async fn with_secrets(start: OffsetDateTime, secrets: MemorySecretProvider) -> Self {
238        let clock = MockClock::new(start);
239        let event_log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(64)));
240        let metrics = Arc::new(MetricsRegistry::default());
241        let inbox = Arc::new(
242            InboxIndex::new(event_log.clone(), metrics.clone())
243                .await
244                .expect("connector testkit inbox should initialize"),
245        );
246        Self {
247            clock,
248            event_log,
249            inbox,
250            metrics,
251            rate_limiter: Arc::new(RateLimiterFactory::default()),
252            secrets: Arc::new(secrets),
253        }
254    }
255
256    pub fn ctx(&self) -> ConnectorCtx {
257        ConnectorCtx {
258            event_log: self.event_log.clone(),
259            secrets: self.secrets.clone(),
260            inbox: self.inbox.clone(),
261            metrics: self.metrics.clone(),
262            rate_limiter: self.rate_limiter.clone(),
263        }
264    }
265
266    pub fn install_clock(&self) -> ClockOverrideGuard {
267        install_clock_override(self.clock.clone())
268    }
269}
270
271#[derive(Debug)]
272pub struct TempPackageWorkspace {
273    root: PathBuf,
274}
275
276impl TempPackageWorkspace {
277    pub fn new(prefix: impl AsRef<str>) -> io::Result<Self> {
278        let root = std::env::temp_dir().join(format!(
279            "{}-{}",
280            prefix.as_ref().trim_matches('-'),
281            Uuid::new_v4()
282        ));
283        fs::create_dir_all(&root)?;
284        Ok(Self { root })
285    }
286
287    pub fn path(&self) -> &Path {
288        &self.root
289    }
290
291    pub fn write_file(
292        &self,
293        relative: impl AsRef<Path>,
294        contents: impl AsRef<[u8]>,
295    ) -> io::Result<PathBuf> {
296        let path = self.root.join(relative.as_ref());
297        if let Some(parent) = path.parent() {
298            fs::create_dir_all(parent)?;
299        }
300        fs::write(&path, contents)?;
301        Ok(path)
302    }
303
304    pub fn write_harn_package(&self, name: &str) -> io::Result<PathBuf> {
305        self.write_file(
306            "Harn.toml",
307            format!("[package]\nname = \"{}\"\nversion = \"0.0.0-test\"\n", name),
308        )
309    }
310
311    pub fn write_cargo_package(&self, name: &str) -> io::Result<PathBuf> {
312        self.write_file(
313            "Cargo.toml",
314            format!(
315                "[package]\nname = \"{}\"\nversion = \"0.0.0\"\nedition = \"2021\"\n",
316                name
317            ),
318        )
319    }
320
321    pub fn write_npm_package(&self, name: &str) -> io::Result<PathBuf> {
322        self.write_file(
323            "package.json",
324            format!("{{\"name\":\"{}\",\"version\":\"0.0.0-test\"}}\n", name),
325        )
326    }
327}
328
329impl Drop for TempPackageWorkspace {
330    fn drop(&mut self) {
331        let _ = fs::remove_dir_all(&self.root);
332    }
333}
334
335pub struct HttpMockGuard;
336
337impl HttpMockGuard {
338    pub fn new() -> Self {
339        crate::http::reset_http_state();
340        Self
341    }
342
343    pub fn push(
344        &self,
345        method: impl Into<String>,
346        url_pattern: impl Into<String>,
347        responses: Vec<HttpMockResponse>,
348    ) {
349        crate::http::push_http_mock(method, url_pattern, responses);
350    }
351
352    pub fn calls(&self) -> Vec<HttpMockCallSnapshot> {
353        crate::http::http_mock_calls_snapshot()
354    }
355}
356
357impl Default for HttpMockGuard {
358    fn default() -> Self {
359        Self::new()
360    }
361}
362
363impl Drop for HttpMockGuard {
364    fn drop(&mut self) {
365        crate::http::reset_http_state();
366    }
367}
368
369#[derive(Clone, Debug, PartialEq)]
370pub enum MockStreamEvent {
371    Json(JsonValue),
372    Bytes(Vec<u8>),
373    Cancelled,
374}
375
376#[derive(Clone, Debug)]
377pub struct MockStreamHandle {
378    tx: mpsc::UnboundedSender<MockStreamEvent>,
379    cancelled: Arc<AtomicBool>,
380}
381
382#[derive(Debug)]
383pub struct MockStreamReader {
384    rx: mpsc::UnboundedReceiver<MockStreamEvent>,
385    cancelled: Arc<AtomicBool>,
386}
387
388pub fn mock_stream() -> (MockStreamHandle, MockStreamReader) {
389    let (tx, rx) = mpsc::unbounded_channel();
390    let cancelled = Arc::new(AtomicBool::new(false));
391    (
392        MockStreamHandle {
393            tx,
394            cancelled: cancelled.clone(),
395        },
396        MockStreamReader { rx, cancelled },
397    )
398}
399
400impl MockStreamHandle {
401    pub fn send_json(
402        &self,
403        value: JsonValue,
404    ) -> Result<(), mpsc::error::SendError<MockStreamEvent>> {
405        self.tx.send(MockStreamEvent::Json(value))
406    }
407
408    pub fn send_bytes(
409        &self,
410        value: impl Into<Vec<u8>>,
411    ) -> Result<(), mpsc::error::SendError<MockStreamEvent>> {
412        self.tx.send(MockStreamEvent::Bytes(value.into()))
413    }
414
415    pub fn cancel(&self) {
416        self.cancelled.store(true, Ordering::SeqCst);
417        let _ = self.tx.send(MockStreamEvent::Cancelled);
418    }
419
420    pub fn is_cancelled(&self) -> bool {
421        self.cancelled.load(Ordering::SeqCst)
422    }
423}
424
425impl MockStreamReader {
426    pub async fn next(&mut self) -> Option<MockStreamEvent> {
427        self.rx.recv().await
428    }
429
430    pub fn is_cancelled(&self) -> bool {
431        self.cancelled.load(Ordering::SeqCst)
432    }
433}
434
435#[derive(Clone, Debug, PartialEq)]
436pub struct WebhookFixture {
437    pub raw: RawInbound,
438    pub body: Vec<u8>,
439}
440
441impl WebhookFixture {
442    pub fn with_binding(mut self, binding: &TriggerBinding) -> Self {
443        self.raw.metadata = json!({
444            "binding_id": binding.binding_id,
445            "binding_version": 1,
446        });
447        self
448    }
449
450    pub fn with_tenant(mut self, tenant_id: impl Into<String>) -> Self {
451        self.raw.tenant_id = Some(TenantId(tenant_id.into()));
452        self
453    }
454}
455
456pub fn github_ping_fixture(secret: &str, received_at: OffsetDateTime) -> WebhookFixture {
457    let body = br#"{"zen":"Keep it logically awesome.","hook_id":42}"#.to_vec();
458    let mut raw = RawInbound::new(
459        "webhook",
460        BTreeMap::from([
461            ("content-type".to_string(), "application/json".to_string()),
462            ("x-github-event".to_string(), "ping".to_string()),
463            ("x-github-delivery".to_string(), "delivery-1".to_string()),
464            (
465                "x-hub-signature-256".to_string(),
466                format!("sha256={}", hmac_sha256_hex(secret.as_bytes(), &body)),
467            ),
468        ]),
469        body.clone(),
470    );
471    raw.received_at = received_at;
472    WebhookFixture { raw, body }
473}
474
475pub fn slack_message_fixture(
476    secret: &str,
477    timestamp: i64,
478    received_at: OffsetDateTime,
479) -> WebhookFixture {
480    let body = br#"{"type":"event_callback","event_id":"Ev1","team_id":"T1","event":{"type":"message","channel_type":"channel","channel":"C1","user":"U1","text":"hello","event_ts":"1710000000.000100"}}"#.to_vec();
481    let signed = format!("v0:{timestamp}:{}", String::from_utf8_lossy(&body));
482    let mut raw = RawInbound::new(
483        "webhook",
484        BTreeMap::from([
485            ("content-type".to_string(), "application/json".to_string()),
486            (
487                "x-slack-request-timestamp".to_string(),
488                timestamp.to_string(),
489            ),
490            (
491                "x-slack-signature".to_string(),
492                format!(
493                    "v0={}",
494                    hmac_sha256_hex(secret.as_bytes(), signed.as_bytes())
495                ),
496            ),
497        ]),
498        body.clone(),
499    );
500    raw.received_at = received_at;
501    WebhookFixture { raw, body }
502}
503
504pub fn linear_issue_update_fixture(secret: &str, received_at: OffsetDateTime) -> WebhookFixture {
505    let body = br#"{"type":"Issue","action":"update","createdAt":"2026-04-19T00:00:00Z","data":{"id":"issue-1","identifier":"ENG-1","title":"connector"},"updatedFrom":{"title":"old"}}"#.to_vec();
506    let mut raw = RawInbound::new(
507        "webhook",
508        BTreeMap::from([
509            ("content-type".to_string(), "application/json".to_string()),
510            (
511                "linear-signature".to_string(),
512                hmac_sha256_hex(secret.as_bytes(), &body),
513            ),
514        ]),
515        body.clone(),
516    );
517    raw.received_at = received_at;
518    WebhookFixture { raw, body }
519}
520
521pub fn notion_page_content_updated_fixture(
522    secret: &str,
523    received_at: OffsetDateTime,
524) -> WebhookFixture {
525    let body = br#"{"id":"evt_1","type":"page.content_updated","workspace_id":"ws_1","subscription_id":"sub_1","integration_id":"int_1","entity":{"id":"page_1","type":"page"},"api_version":"2022-06-28"}"#.to_vec();
526    let mut raw = RawInbound::new(
527        "webhook",
528        BTreeMap::from([
529            ("content-type".to_string(), "application/json".to_string()),
530            (
531                "x-notion-signature".to_string(),
532                format!("sha256={}", hmac_sha256_hex(secret.as_bytes(), &body)),
533            ),
534            ("request-id".to_string(), "req_1".to_string()),
535        ]),
536        body.clone(),
537    );
538    raw.received_at = received_at;
539    WebhookFixture { raw, body }
540}
541
542pub fn webhook_binding(
543    provider: impl Into<String>,
544    binding_id: impl Into<String>,
545    signing_secret: Option<SecretId>,
546) -> TriggerBinding {
547    let provider = provider.into();
548    let mut binding =
549        TriggerBinding::new(ProviderId::from(provider.clone()), "webhook", binding_id);
550    let mut secrets = serde_json::Map::new();
551    if let Some(secret) = signing_secret {
552        secrets.insert(
553            "signing_secret".to_string(),
554            JsonValue::String(secret.to_string()),
555        );
556    }
557    binding.config = json!({
558        "path": format!("/hooks/{provider}"),
559        "match": {"events": ["*"]},
560        "secrets": secrets,
561    });
562    binding
563}
564
565fn hmac_sha256_hex(secret: &[u8], data: &[u8]) -> String {
566    hex::encode(crate::connectors::hmac::hmac_sha256(secret, data))
567}
568
569pub async fn advance_until<F>(
570    clock: &MockClock,
571    timeout: StdDuration,
572    tick: StdDuration,
573    mut predicate: F,
574) -> bool
575where
576    F: FnMut() -> bool,
577{
578    let mut elapsed = StdDuration::ZERO;
579    while elapsed <= timeout {
580        if predicate() {
581            return true;
582        }
583        clock.advance_std(tick).await;
584        elapsed += tick;
585    }
586    predicate()
587}
588
589#[cfg(test)]
590mod tests {
591    use super::*;
592    use crate::secrets::SecretProvider;
593
594    fn parse_ts(value: &str) -> OffsetDateTime {
595        OffsetDateTime::parse(value, &time::format_description::well_known::Rfc3339).unwrap()
596    }
597
598    #[tokio::test]
599    async fn memory_secret_provider_scopes_and_versions_secrets() {
600        let mut provider = MemorySecretProvider::new("test");
601        let scoped = provider.insert_scoped("github", "tenant-a", "binding-a", "token", "v1");
602        provider
603            .put(&scoped, SecretBytes::from("v2"))
604            .await
605            .expect("put latest");
606
607        let latest = provider.get(&scoped).await.expect("latest");
608        assert_eq!(latest.with_exposed(|bytes| bytes.to_vec()), b"v2".to_vec());
609        let first = provider
610            .get(&scoped.clone().with_version(SecretVersion::Exact(1)))
611            .await
612            .expect("v1");
613        assert_eq!(first.with_exposed(|bytes| bytes.to_vec()), b"v1".to_vec());
614        assert!(provider
615            .get(&scoped_secret_id(
616                "github",
617                "tenant-b",
618                "binding-a",
619                "token"
620            ))
621            .await
622            .is_err());
623    }
624
625    #[tokio::test]
626    async fn connector_testkit_controls_clock_and_deadlines() {
627        let kit = ConnectorTestkit::new(parse_ts("2026-04-19T00:00:00Z")).await;
628        let _guard = kit.install_clock();
629        let mut fired = false;
630        assert!(
631            !advance_until(
632                &kit.clock,
633                StdDuration::from_millis(20),
634                StdDuration::from_millis(10),
635                || fired,
636            )
637            .await
638        );
639        fired = true;
640        assert!(
641            advance_until(
642                &kit.clock,
643                StdDuration::from_millis(20),
644                StdDuration::from_millis(10),
645                || fired,
646            )
647            .await
648        );
649        assert_eq!(instant_now().as_millis(), 30);
650    }
651
652    #[tokio::test]
653    async fn mock_stream_cancels_reader_without_wall_clock_sleep() {
654        let (handle, mut reader) = mock_stream();
655        handle.send_json(json!({"event": "one"})).expect("send");
656        assert_eq!(
657            reader.next().await,
658            Some(MockStreamEvent::Json(json!({"event": "one"})))
659        );
660        handle.cancel();
661        assert_eq!(reader.next().await, Some(MockStreamEvent::Cancelled));
662        assert!(reader.is_cancelled());
663    }
664
665    #[test]
666    fn temp_workspace_writes_package_markers() {
667        let workspace = TempPackageWorkspace::new("harn-testkit").expect("workspace");
668        workspace.write_harn_package("demo").expect("harn package");
669        workspace
670            .write_cargo_package("demo")
671            .expect("cargo package");
672        workspace.write_npm_package("demo").expect("npm package");
673        assert!(workspace.path().join("Harn.toml").exists());
674        assert!(workspace.path().join("Cargo.toml").exists());
675        assert!(workspace.path().join("package.json").exists());
676    }
677
678    #[test]
679    fn webhook_fixtures_include_provider_signatures() {
680        let received_at = parse_ts("2026-04-19T00:00:00Z");
681        let github = github_ping_fixture("topsecret", received_at);
682        assert!(github.raw.headers["x-hub-signature-256"].starts_with("sha256="));
683        let slack = slack_message_fixture("topsecret", received_at.unix_timestamp(), received_at);
684        assert!(slack.raw.headers["x-slack-signature"].starts_with("v0="));
685        let linear = linear_issue_update_fixture("topsecret", received_at);
686        assert_eq!(linear.raw.headers["linear-signature"].len(), 64);
687        let notion = notion_page_content_updated_fixture("topsecret", received_at);
688        assert!(notion.raw.headers["x-notion-signature"].starts_with("sha256="));
689    }
690}