Skip to main content

harn_vm/connectors/
testkit.rs

1use std::collections::BTreeMap;
2use std::fs;
3use std::io;
4use std::path::{Path, PathBuf};
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, Mutex};
7use std::time::Duration as StdDuration;
8
9use async_trait::async_trait;
10use serde_json::{json, Value as JsonValue};
11use sha2::{Digest, Sha256};
12use time::OffsetDateTime;
13use tokio::sync::mpsc;
14use uuid::Uuid;
15
16pub use crate::http::{HttpMockCallSnapshot, HttpMockResponse};
17pub use crate::triggers::test_util::clock::{
18    active_mock_clock, install_override as install_clock_override, instant_now, now_ms, now_utc,
19    ClockInstant, ClockOverrideGuard, MockClock,
20};
21
22use crate::connectors::{
23    ConnectorCtx, MetricsRegistry, RateLimiterFactory, RawInbound, TriggerBinding,
24};
25use crate::event_log::{AnyEventLog, MemoryEventLog};
26use crate::secrets::{
27    RotationHandle, SecretBytes, SecretError, SecretId, SecretMeta, SecretProvider, SecretVersion,
28};
29use crate::triggers::{InboxIndex, ProviderId, TenantId};
30
31#[derive(Clone, Debug)]
32pub struct MemorySecretProvider {
33    provider: String,
34    inner: Arc<Mutex<BTreeMap<(String, String), VersionedSecret>>>,
35}
36
37#[derive(Clone, Debug, Default)]
38struct VersionedSecret {
39    latest: Option<u64>,
40    versions: BTreeMap<u64, Vec<u8>>,
41}
42
43impl MemorySecretProvider {
44    pub fn new(provider: impl Into<String>) -> Self {
45        Self {
46            provider: provider.into(),
47            inner: Arc::new(Mutex::new(BTreeMap::new())),
48        }
49    }
50
51    pub fn empty() -> Self {
52        Self::new("connector-testkit")
53    }
54
55    pub fn with_secret(mut self, id: SecretId, value: impl AsRef<[u8]>) -> Self {
56        self.insert(id, value);
57        self
58    }
59
60    pub fn with_scoped_secret(
61        self,
62        namespace: impl Into<String>,
63        tenant_id: impl AsRef<str>,
64        binding_id: impl AsRef<str>,
65        name: impl AsRef<str>,
66        value: impl AsRef<[u8]>,
67    ) -> Self {
68        let id = scoped_secret_id(namespace, tenant_id, binding_id, name);
69        self.with_secret(id, value)
70    }
71
72    pub fn insert(&mut self, id: SecretId, value: impl AsRef<[u8]>) {
73        let mut inner = self.inner.lock().expect("memory secret provider poisoned");
74        insert_secret(&mut inner, id, value.as_ref().to_vec());
75    }
76
77    pub fn insert_scoped(
78        &mut self,
79        namespace: impl Into<String>,
80        tenant_id: impl AsRef<str>,
81        binding_id: impl AsRef<str>,
82        name: impl AsRef<str>,
83        value: impl AsRef<[u8]>,
84    ) -> SecretId {
85        let id = scoped_secret_id(namespace, tenant_id, binding_id, name);
86        self.insert(id.clone(), value);
87        id
88    }
89
90    pub fn snapshot(&self) -> Vec<SecretMeta> {
91        let inner = self.inner.lock().expect("memory secret provider poisoned");
92        inner
93            .iter()
94            .filter_map(|((namespace, name), secret)| {
95                let latest = secret.latest?;
96                Some(SecretMeta {
97                    id: SecretId::new(namespace.clone(), name.clone())
98                        .with_version(SecretVersion::Exact(latest)),
99                    provider: self.provider.clone(),
100                })
101            })
102            .collect()
103    }
104}
105
106#[async_trait]
107impl SecretProvider for MemorySecretProvider {
108    async fn get(&self, id: &SecretId) -> Result<SecretBytes, SecretError> {
109        let inner = self.inner.lock().expect("memory secret provider poisoned");
110        let secret = inner
111            .get(&(id.namespace.clone(), id.name.clone()))
112            .ok_or_else(|| SecretError::NotFound {
113                provider: self.provider.clone(),
114                id: id.clone(),
115            })?;
116        let version = match id.version {
117            SecretVersion::Latest => secret.latest,
118            SecretVersion::Exact(version) => Some(version),
119        }
120        .ok_or_else(|| SecretError::NotFound {
121            provider: self.provider.clone(),
122            id: id.clone(),
123        })?;
124        secret
125            .versions
126            .get(&version)
127            .cloned()
128            .map(SecretBytes::from)
129            .ok_or_else(|| SecretError::NotFound {
130                provider: self.provider.clone(),
131                id: id.clone(),
132            })
133    }
134
135    async fn put(&self, id: &SecretId, value: SecretBytes) -> Result<(), SecretError> {
136        let mut inner = self.inner.lock().expect("memory secret provider poisoned");
137        let value = value.with_exposed(|bytes| bytes.to_vec());
138        insert_secret(&mut inner, id.clone(), value);
139        Ok(())
140    }
141
142    async fn rotate(&self, id: &SecretId) -> Result<RotationHandle, SecretError> {
143        let mut inner = self.inner.lock().expect("memory secret provider poisoned");
144        let key = (id.namespace.clone(), id.name.clone());
145        let secret = inner.entry(key).or_default();
146        let from_version = secret.latest;
147        let to_version = from_version.unwrap_or(0) + 1;
148        let value = from_version
149            .and_then(|version| secret.versions.get(&version).cloned())
150            .unwrap_or_default();
151        secret.versions.insert(to_version, value);
152        secret.latest = Some(to_version);
153        Ok(RotationHandle {
154            provider: self.provider.clone(),
155            id: SecretId::new(id.namespace.clone(), id.name.clone())
156                .with_version(SecretVersion::Exact(to_version)),
157            from_version,
158            to_version: Some(to_version),
159        })
160    }
161
162    async fn list(&self, prefix: &SecretId) -> Result<Vec<SecretMeta>, SecretError> {
163        let inner = self.inner.lock().expect("memory secret provider poisoned");
164        Ok(inner
165            .iter()
166            .filter(|((namespace, name), _)| {
167                namespace == &prefix.namespace && name.starts_with(&prefix.name)
168            })
169            .filter_map(|((namespace, name), secret)| {
170                let latest = secret.latest?;
171                Some(SecretMeta {
172                    id: SecretId::new(namespace.clone(), name.clone())
173                        .with_version(SecretVersion::Exact(latest)),
174                    provider: self.provider.clone(),
175                })
176            })
177            .collect())
178    }
179
180    fn namespace(&self) -> &str {
181        &self.provider
182    }
183
184    fn supports_versions(&self) -> bool {
185        true
186    }
187}
188
189fn insert_secret(
190    inner: &mut BTreeMap<(String, String), VersionedSecret>,
191    id: SecretId,
192    value: Vec<u8>,
193) {
194    let secret = inner.entry((id.namespace, id.name)).or_default();
195    let version = match id.version {
196        SecretVersion::Latest => secret.latest.unwrap_or(0) + 1,
197        SecretVersion::Exact(version) => version,
198    };
199    secret.versions.insert(version, value);
200    secret.latest = Some(secret.latest.map_or(version, |latest| latest.max(version)));
201}
202
203pub fn scoped_secret_id(
204    namespace: impl Into<String>,
205    tenant_id: impl AsRef<str>,
206    binding_id: impl AsRef<str>,
207    name: impl AsRef<str>,
208) -> SecretId {
209    SecretId::new(
210        namespace,
211        format!(
212            "tenants/{}/bindings/{}/{}",
213            tenant_id.as_ref(),
214            binding_id.as_ref(),
215            name.as_ref()
216        ),
217    )
218}
219
220#[derive(Clone)]
221pub struct ConnectorTestkit {
222    pub clock: Arc<MockClock>,
223    pub event_log: Arc<AnyEventLog>,
224    pub inbox: Arc<InboxIndex>,
225    pub metrics: Arc<MetricsRegistry>,
226    pub rate_limiter: Arc<RateLimiterFactory>,
227    pub secrets: Arc<MemorySecretProvider>,
228}
229
230impl ConnectorTestkit {
231    pub async fn new(start: OffsetDateTime) -> Self {
232        Self::with_secrets(start, MemorySecretProvider::empty()).await
233    }
234
235    pub async fn with_secrets(start: OffsetDateTime, secrets: MemorySecretProvider) -> Self {
236        let clock = MockClock::new(start);
237        let event_log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(64)));
238        let metrics = Arc::new(MetricsRegistry::default());
239        let inbox = Arc::new(
240            InboxIndex::new(event_log.clone(), metrics.clone())
241                .await
242                .expect("connector testkit inbox should initialize"),
243        );
244        Self {
245            clock,
246            event_log,
247            inbox,
248            metrics,
249            rate_limiter: Arc::new(RateLimiterFactory::default()),
250            secrets: Arc::new(secrets),
251        }
252    }
253
254    pub fn ctx(&self) -> ConnectorCtx {
255        ConnectorCtx {
256            event_log: self.event_log.clone(),
257            secrets: self.secrets.clone(),
258            inbox: self.inbox.clone(),
259            metrics: self.metrics.clone(),
260            rate_limiter: self.rate_limiter.clone(),
261        }
262    }
263
264    pub fn install_clock(&self) -> ClockOverrideGuard {
265        install_clock_override(self.clock.clone())
266    }
267}
268
269#[derive(Debug)]
270pub struct TempPackageWorkspace {
271    root: PathBuf,
272}
273
274impl TempPackageWorkspace {
275    pub fn new(prefix: impl AsRef<str>) -> io::Result<Self> {
276        let root = std::env::temp_dir().join(format!(
277            "{}-{}",
278            prefix.as_ref().trim_matches('-'),
279            Uuid::new_v4()
280        ));
281        fs::create_dir_all(&root)?;
282        Ok(Self { root })
283    }
284
285    pub fn path(&self) -> &Path {
286        &self.root
287    }
288
289    pub fn write_file(
290        &self,
291        relative: impl AsRef<Path>,
292        contents: impl AsRef<[u8]>,
293    ) -> io::Result<PathBuf> {
294        let path = self.root.join(relative.as_ref());
295        if let Some(parent) = path.parent() {
296            fs::create_dir_all(parent)?;
297        }
298        fs::write(&path, contents)?;
299        Ok(path)
300    }
301
302    pub fn write_harn_package(&self, name: &str) -> io::Result<PathBuf> {
303        self.write_file(
304            "Harn.toml",
305            format!("[package]\nname = \"{}\"\nversion = \"0.0.0-test\"\n", name),
306        )
307    }
308
309    pub fn write_cargo_package(&self, name: &str) -> io::Result<PathBuf> {
310        self.write_file(
311            "Cargo.toml",
312            format!(
313                "[package]\nname = \"{}\"\nversion = \"0.0.0\"\nedition = \"2021\"\n",
314                name
315            ),
316        )
317    }
318
319    pub fn write_npm_package(&self, name: &str) -> io::Result<PathBuf> {
320        self.write_file(
321            "package.json",
322            format!("{{\"name\":\"{}\",\"version\":\"0.0.0-test\"}}\n", name),
323        )
324    }
325}
326
327impl Drop for TempPackageWorkspace {
328    fn drop(&mut self) {
329        let _ = fs::remove_dir_all(&self.root);
330    }
331}
332
333pub struct HttpMockGuard;
334
335impl HttpMockGuard {
336    pub fn new() -> Self {
337        crate::http::reset_http_state();
338        Self
339    }
340
341    pub fn push(
342        &self,
343        method: impl Into<String>,
344        url_pattern: impl Into<String>,
345        responses: Vec<HttpMockResponse>,
346    ) {
347        crate::http::push_http_mock(method, url_pattern, responses);
348    }
349
350    pub fn calls(&self) -> Vec<HttpMockCallSnapshot> {
351        crate::http::http_mock_calls_snapshot()
352    }
353}
354
355impl Default for HttpMockGuard {
356    fn default() -> Self {
357        Self::new()
358    }
359}
360
361impl Drop for HttpMockGuard {
362    fn drop(&mut self) {
363        crate::http::reset_http_state();
364    }
365}
366
367#[derive(Clone, Debug, PartialEq)]
368pub enum MockStreamEvent {
369    Json(JsonValue),
370    Bytes(Vec<u8>),
371    Cancelled,
372}
373
374#[derive(Clone, Debug)]
375pub struct MockStreamHandle {
376    tx: mpsc::UnboundedSender<MockStreamEvent>,
377    cancelled: Arc<AtomicBool>,
378}
379
380#[derive(Debug)]
381pub struct MockStreamReader {
382    rx: mpsc::UnboundedReceiver<MockStreamEvent>,
383    cancelled: Arc<AtomicBool>,
384}
385
386pub fn mock_stream() -> (MockStreamHandle, MockStreamReader) {
387    let (tx, rx) = mpsc::unbounded_channel();
388    let cancelled = Arc::new(AtomicBool::new(false));
389    (
390        MockStreamHandle {
391            tx,
392            cancelled: cancelled.clone(),
393        },
394        MockStreamReader { rx, cancelled },
395    )
396}
397
398impl MockStreamHandle {
399    pub fn send_json(
400        &self,
401        value: JsonValue,
402    ) -> Result<(), mpsc::error::SendError<MockStreamEvent>> {
403        self.tx.send(MockStreamEvent::Json(value))
404    }
405
406    pub fn send_bytes(
407        &self,
408        value: impl Into<Vec<u8>>,
409    ) -> Result<(), mpsc::error::SendError<MockStreamEvent>> {
410        self.tx.send(MockStreamEvent::Bytes(value.into()))
411    }
412
413    pub fn cancel(&self) {
414        self.cancelled.store(true, Ordering::SeqCst);
415        let _ = self.tx.send(MockStreamEvent::Cancelled);
416    }
417
418    pub fn is_cancelled(&self) -> bool {
419        self.cancelled.load(Ordering::SeqCst)
420    }
421}
422
423impl MockStreamReader {
424    pub async fn next(&mut self) -> Option<MockStreamEvent> {
425        self.rx.recv().await
426    }
427
428    pub fn is_cancelled(&self) -> bool {
429        self.cancelled.load(Ordering::SeqCst)
430    }
431}
432
433#[derive(Clone, Debug, PartialEq)]
434pub struct WebhookFixture {
435    pub raw: RawInbound,
436    pub body: Vec<u8>,
437}
438
439impl WebhookFixture {
440    pub fn with_binding(mut self, binding: &TriggerBinding) -> Self {
441        self.raw.metadata = json!({
442            "binding_id": binding.binding_id,
443            "binding_version": 1,
444        });
445        self
446    }
447
448    pub fn with_tenant(mut self, tenant_id: impl Into<String>) -> Self {
449        self.raw.tenant_id = Some(TenantId(tenant_id.into()));
450        self
451    }
452}
453
454pub fn github_ping_fixture(secret: &str, received_at: OffsetDateTime) -> WebhookFixture {
455    let body = br#"{"zen":"Keep it logically awesome.","hook_id":42}"#.to_vec();
456    let mut raw = RawInbound::new(
457        "webhook",
458        BTreeMap::from([
459            ("content-type".to_string(), "application/json".to_string()),
460            ("x-github-event".to_string(), "ping".to_string()),
461            ("x-github-delivery".to_string(), "delivery-1".to_string()),
462            (
463                "x-hub-signature-256".to_string(),
464                format!("sha256={}", hmac_sha256_hex(secret.as_bytes(), &body)),
465            ),
466        ]),
467        body.clone(),
468    );
469    raw.received_at = received_at;
470    WebhookFixture { raw, body }
471}
472
473pub fn slack_message_fixture(
474    secret: &str,
475    timestamp: i64,
476    received_at: OffsetDateTime,
477) -> WebhookFixture {
478    let body = br#"{"type":"event_callback","event_id":"Ev1","team_id":"T1","event":{"type":"message","channel_type":"channel","channel":"C1","user":"U1","text":"hello","event_ts":"1710000000.000100"}}"#.to_vec();
479    let signed = format!("v0:{timestamp}:{}", String::from_utf8_lossy(&body));
480    let mut raw = RawInbound::new(
481        "webhook",
482        BTreeMap::from([
483            ("content-type".to_string(), "application/json".to_string()),
484            (
485                "x-slack-request-timestamp".to_string(),
486                timestamp.to_string(),
487            ),
488            (
489                "x-slack-signature".to_string(),
490                format!(
491                    "v0={}",
492                    hmac_sha256_hex(secret.as_bytes(), signed.as_bytes())
493                ),
494            ),
495        ]),
496        body.clone(),
497    );
498    raw.received_at = received_at;
499    WebhookFixture { raw, body }
500}
501
502pub fn linear_issue_update_fixture(secret: &str, received_at: OffsetDateTime) -> WebhookFixture {
503    let body = br#"{"type":"Issue","action":"update","createdAt":"2026-04-19T00:00:00Z","data":{"id":"issue-1","identifier":"ENG-1","title":"connector"},"updatedFrom":{"title":"old"}}"#.to_vec();
504    let mut raw = RawInbound::new(
505        "webhook",
506        BTreeMap::from([
507            ("content-type".to_string(), "application/json".to_string()),
508            (
509                "linear-signature".to_string(),
510                hmac_sha256_hex(secret.as_bytes(), &body),
511            ),
512        ]),
513        body.clone(),
514    );
515    raw.received_at = received_at;
516    WebhookFixture { raw, body }
517}
518
519pub fn notion_page_content_updated_fixture(
520    secret: &str,
521    received_at: OffsetDateTime,
522) -> WebhookFixture {
523    let body = br#"{"id":"evt_1","type":"page.content_updated","workspace_id":"ws_1","subscription_id":"sub_1","integration_id":"int_1","entity":{"id":"page_1","type":"page"},"api_version":"2022-06-28"}"#.to_vec();
524    let mut raw = RawInbound::new(
525        "webhook",
526        BTreeMap::from([
527            ("content-type".to_string(), "application/json".to_string()),
528            (
529                "x-notion-signature".to_string(),
530                format!("sha256={}", hmac_sha256_hex(secret.as_bytes(), &body)),
531            ),
532            ("request-id".to_string(), "req_1".to_string()),
533        ]),
534        body.clone(),
535    );
536    raw.received_at = received_at;
537    WebhookFixture { raw, body }
538}
539
540pub fn webhook_binding(
541    provider: impl Into<String>,
542    binding_id: impl Into<String>,
543    signing_secret: Option<SecretId>,
544) -> TriggerBinding {
545    let provider = provider.into();
546    let mut binding =
547        TriggerBinding::new(ProviderId::from(provider.clone()), "webhook", binding_id);
548    let mut secrets = serde_json::Map::new();
549    if let Some(secret) = signing_secret {
550        secrets.insert(
551            "signing_secret".to_string(),
552            JsonValue::String(secret.to_string()),
553        );
554    }
555    binding.config = json!({
556        "path": format!("/hooks/{provider}"),
557        "match": {"events": ["*"]},
558        "secrets": secrets,
559    });
560    binding
561}
562
563fn hmac_sha256_hex(secret: &[u8], data: &[u8]) -> String {
564    const BLOCK_SIZE: usize = 64;
565    let mut key = if secret.len() > BLOCK_SIZE {
566        Sha256::digest(secret).to_vec()
567    } else {
568        secret.to_vec()
569    };
570    key.resize(BLOCK_SIZE, 0);
571    let mut outer = vec![0x5c; BLOCK_SIZE];
572    let mut inner = vec![0x36; BLOCK_SIZE];
573    for i in 0..BLOCK_SIZE {
574        outer[i] ^= key[i];
575        inner[i] ^= key[i];
576    }
577    let mut inner_hash = Sha256::new();
578    inner_hash.update(&inner);
579    inner_hash.update(data);
580    let inner_result = inner_hash.finalize();
581    let mut outer_hash = Sha256::new();
582    outer_hash.update(&outer);
583    outer_hash.update(inner_result);
584    hex::encode(outer_hash.finalize())
585}
586
587pub async fn advance_until<F>(
588    clock: &MockClock,
589    timeout: StdDuration,
590    tick: StdDuration,
591    mut predicate: F,
592) -> bool
593where
594    F: FnMut() -> bool,
595{
596    let mut elapsed = StdDuration::ZERO;
597    while elapsed <= timeout {
598        if predicate() {
599            return true;
600        }
601        clock.advance_std(tick).await;
602        elapsed += tick;
603    }
604    predicate()
605}
606
607#[cfg(test)]
608mod tests {
609    use super::*;
610    use crate::secrets::SecretProvider;
611
612    fn parse_ts(value: &str) -> OffsetDateTime {
613        OffsetDateTime::parse(value, &time::format_description::well_known::Rfc3339).unwrap()
614    }
615
616    #[tokio::test]
617    async fn memory_secret_provider_scopes_and_versions_secrets() {
618        let mut provider = MemorySecretProvider::new("test");
619        let scoped = provider.insert_scoped("github", "tenant-a", "binding-a", "token", "v1");
620        provider
621            .put(&scoped, SecretBytes::from("v2"))
622            .await
623            .expect("put latest");
624
625        let latest = provider.get(&scoped).await.expect("latest");
626        assert_eq!(latest.with_exposed(|bytes| bytes.to_vec()), b"v2".to_vec());
627        let first = provider
628            .get(&scoped.clone().with_version(SecretVersion::Exact(1)))
629            .await
630            .expect("v1");
631        assert_eq!(first.with_exposed(|bytes| bytes.to_vec()), b"v1".to_vec());
632        assert!(provider
633            .get(&scoped_secret_id(
634                "github",
635                "tenant-b",
636                "binding-a",
637                "token"
638            ))
639            .await
640            .is_err());
641    }
642
643    #[tokio::test]
644    async fn connector_testkit_controls_clock_and_deadlines() {
645        let kit = ConnectorTestkit::new(parse_ts("2026-04-19T00:00:00Z")).await;
646        let _guard = kit.install_clock();
647        let mut fired = false;
648        assert!(
649            !advance_until(
650                &kit.clock,
651                StdDuration::from_millis(20),
652                StdDuration::from_millis(10),
653                || fired,
654            )
655            .await
656        );
657        fired = true;
658        assert!(
659            advance_until(
660                &kit.clock,
661                StdDuration::from_millis(20),
662                StdDuration::from_millis(10),
663                || fired,
664            )
665            .await
666        );
667        assert_eq!(instant_now().as_millis(), 30);
668    }
669
670    #[tokio::test]
671    async fn mock_stream_cancels_reader_without_wall_clock_sleep() {
672        let (handle, mut reader) = mock_stream();
673        handle.send_json(json!({"event": "one"})).expect("send");
674        assert_eq!(
675            reader.next().await,
676            Some(MockStreamEvent::Json(json!({"event": "one"})))
677        );
678        handle.cancel();
679        assert_eq!(reader.next().await, Some(MockStreamEvent::Cancelled));
680        assert!(reader.is_cancelled());
681    }
682
683    #[test]
684    fn temp_workspace_writes_package_markers() {
685        let workspace = TempPackageWorkspace::new("harn-testkit").expect("workspace");
686        workspace.write_harn_package("demo").expect("harn package");
687        workspace
688            .write_cargo_package("demo")
689            .expect("cargo package");
690        workspace.write_npm_package("demo").expect("npm package");
691        assert!(workspace.path().join("Harn.toml").exists());
692        assert!(workspace.path().join("Cargo.toml").exists());
693        assert!(workspace.path().join("package.json").exists());
694    }
695
696    #[test]
697    fn webhook_fixtures_include_provider_signatures() {
698        let received_at = parse_ts("2026-04-19T00:00:00Z");
699        let github = github_ping_fixture("topsecret", received_at);
700        assert!(github.raw.headers["x-hub-signature-256"].starts_with("sha256="));
701        let slack = slack_message_fixture("topsecret", received_at.unix_timestamp(), received_at);
702        assert!(slack.raw.headers["x-slack-signature"].starts_with("v0="));
703        let linear = linear_issue_update_fixture("topsecret", received_at);
704        assert_eq!(linear.raw.headers["linear-signature"].len(), 64);
705        let notion = notion_page_content_updated_fixture("topsecret", received_at);
706        assert!(notion.raw.headers["x-notion-signature"].starts_with("sha256="));
707    }
708}