Skip to main content

harn_vm/connectors/
stream.rs

1use std::collections::{BTreeMap, HashMap};
2use std::sync::{Arc, RwLock};
3
4use async_trait::async_trait;
5use serde_json::{json, Value as JsonValue};
6use sha2::{Digest, Sha256};
7use time::OffsetDateTime;
8
9use crate::connectors::{
10    ActivationHandle, ClientError, Connector, ConnectorClient, ConnectorCtx, ConnectorError,
11    ProviderPayloadSchema, RawInbound, TriggerBinding, TriggerKind,
12};
13use crate::triggers::{
14    redact_headers, HeaderRedactionPolicy, ProviderId, ProviderPayload, SignatureStatus,
15    StreamGateOutcome, StreamTriggerRuntime, TraceId, TriggerEvent, TriggerEventId,
16};
17
18pub struct StreamConnector {
19    provider_id: ProviderId,
20    kinds: Vec<TriggerKind>,
21    schema_name: String,
22    client: Arc<StreamClient>,
23    state: RwLock<ConnectorState>,
24}
25
26#[derive(Default)]
27struct ConnectorState {
28    ctx: Option<ConnectorCtx>,
29    bindings: HashMap<String, ActivatedStreamBinding>,
30}
31
32#[derive(Clone, Debug)]
33struct ActivatedStreamBinding {
34    match_events: Vec<String>,
35    stream: JsonValue,
36}
37
38#[derive(Default)]
39struct StreamClient;
40
41#[async_trait]
42impl ConnectorClient for StreamClient {
43    async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
44        Err(ClientError::MethodNotFound(format!(
45            "stream connector has no outbound method `{method}`"
46        )))
47    }
48}
49
50impl StreamConnector {
51    pub fn new(provider_id: ProviderId, schema_name: impl Into<String>) -> Self {
52        Self {
53            provider_id,
54            kinds: vec![TriggerKind::from("stream")],
55            schema_name: schema_name.into(),
56            client: Arc::new(StreamClient),
57            state: RwLock::new(ConnectorState::default()),
58        }
59    }
60
61    fn binding_for_raw(&self, raw: &RawInbound) -> Result<ActivatedStreamBinding, ConnectorError> {
62        let state = self.state.read().expect("stream connector state poisoned");
63        let binding = if let Some(binding_id) =
64            raw.metadata.get("binding_id").and_then(JsonValue::as_str)
65        {
66            state.bindings.get(binding_id).cloned().ok_or_else(|| {
67                ConnectorError::Unsupported(format!(
68                    "stream connector has no active binding `{binding_id}`"
69                ))
70            })?
71        } else if state.bindings.len() == 1 {
72            state
73                .bindings
74                .values()
75                .next()
76                .cloned()
77                .expect("checked single binding")
78        } else {
79            return Err(ConnectorError::Unsupported(
80                "stream connector requires raw.metadata.binding_id when multiple bindings are active"
81                    .to_string(),
82            ));
83        };
84        Ok(binding)
85    }
86
87    fn ctx(&self) -> Result<ConnectorCtx, ConnectorError> {
88        self.state
89            .read()
90            .expect("stream connector state poisoned")
91            .ctx
92            .clone()
93            .ok_or_else(|| {
94                ConnectorError::Activation(
95                    "stream connector must be initialized before use".to_string(),
96                )
97            })
98    }
99
100    /// Normalize provider-native input and admit it through the streaming
101    /// trigger runtime before dispatcher execution.
102    pub async fn push_inbound(
103        &self,
104        runtime: &mut StreamTriggerRuntime,
105        raw: RawInbound,
106    ) -> Result<Vec<crate::triggers::DispatchOutcome>, ConnectorError> {
107        let event = self.normalize_inbound(raw).await?;
108        runtime
109            .push_event(event)
110            .await
111            .map_err(|error| ConnectorError::HarnRuntime(error.to_string()))
112    }
113
114    /// Test and replay hook for deterministic stream gates. Production LLM
115    /// gates should persist their decision through the runtime cache before
116    /// dispatch, then replay by cache key.
117    pub async fn push_inbound_with_gate(
118        &self,
119        runtime: &mut StreamTriggerRuntime,
120        raw: RawInbound,
121        gate: impl Fn(&crate::triggers::StreamWindowEnvelope) -> StreamGateOutcome,
122    ) -> Result<Vec<crate::triggers::DispatchOutcome>, ConnectorError> {
123        let event = self.normalize_inbound(raw).await?;
124        runtime
125            .push_event_with_gate(event, gate)
126            .await
127            .map_err(|error| ConnectorError::HarnRuntime(error.to_string()))
128    }
129}
130
131#[async_trait]
132impl Connector for StreamConnector {
133    fn provider_id(&self) -> &ProviderId {
134        &self.provider_id
135    }
136
137    fn kinds(&self) -> &[TriggerKind] {
138        &self.kinds
139    }
140
141    async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
142        self.state
143            .write()
144            .expect("stream connector state poisoned")
145            .ctx = Some(ctx);
146        Ok(())
147    }
148
149    async fn activate(
150        &self,
151        bindings: &[TriggerBinding],
152    ) -> Result<ActivationHandle, ConnectorError> {
153        let mut configured = HashMap::new();
154        for binding in bindings {
155            let activated = ActivatedStreamBinding::from_binding(binding)?;
156            configured.insert(binding.binding_id.clone(), activated);
157        }
158
159        self.state
160            .write()
161            .expect("stream connector state poisoned")
162            .bindings = configured;
163        Ok(ActivationHandle::new(
164            self.provider_id().clone(),
165            bindings.len(),
166        ))
167    }
168
169    async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
170        let _ctx = self.ctx()?;
171        let binding = self.binding_for_raw(&raw)?;
172        let body = normalized_body(&raw)?;
173        let kind = stream_event_kind(&binding, &body);
174        let dedupe_key = stream_dedupe_key(&binding, &raw, &body);
175        let provider_payload =
176            ProviderPayload::normalize(&self.provider_id, &kind, &raw.headers, body)
177                .map_err(|error| ConnectorError::Unsupported(error.to_string()))?;
178        let occurred_at = raw
179            .occurred_at
180            .or_else(|| infer_occurred_at(&provider_payload));
181
182        Ok(TriggerEvent {
183            id: TriggerEventId::new(),
184            provider: self.provider_id.clone(),
185            kind,
186            received_at: raw.received_at,
187            occurred_at,
188            dedupe_key,
189            trace_id: TraceId::new(),
190            tenant_id: raw.tenant_id,
191            headers: redact_headers(&raw.headers, &HeaderRedactionPolicy::default()),
192            batch: None,
193            raw_body: Some(raw.body),
194            provider_payload,
195            signature_status: SignatureStatus::Unsigned,
196            dedupe_claimed: false,
197        })
198    }
199
200    fn payload_schema(&self) -> ProviderPayloadSchema {
201        ProviderPayloadSchema::named(self.schema_name.clone())
202    }
203
204    fn client(&self) -> Arc<dyn ConnectorClient> {
205        self.client.clone()
206    }
207}
208
209impl ActivatedStreamBinding {
210    fn from_binding(binding: &TriggerBinding) -> Result<Self, ConnectorError> {
211        let config = binding.config.as_object().ok_or_else(|| {
212            ConnectorError::Activation(format!(
213                "stream binding '{}' config must be an object",
214                binding.binding_id
215            ))
216        })?;
217        let match_events = config
218            .get("match")
219            .and_then(|value| value.get("events"))
220            .and_then(JsonValue::as_array)
221            .map(|events| {
222                events
223                    .iter()
224                    .filter_map(JsonValue::as_str)
225                    .map(ToString::to_string)
226                    .collect::<Vec<_>>()
227            })
228            .unwrap_or_default();
229        let stream = config.get("stream").cloned().unwrap_or(JsonValue::Null);
230
231        Ok(Self {
232            match_events,
233            stream,
234        })
235    }
236}
237
238fn normalized_body(raw: &RawInbound) -> Result<JsonValue, ConnectorError> {
239    let content_type = header_value(&raw.headers, "content-type").unwrap_or_default();
240    if content_type.contains("json") {
241        return raw.json_body();
242    }
243    if let Ok(value) = serde_json::from_slice(&raw.body) {
244        return Ok(value);
245    }
246    use base64::Engine;
247    Ok(json!({
248        "raw_base64": base64::engine::general_purpose::STANDARD.encode(&raw.body),
249        "raw_utf8": std::str::from_utf8(&raw.body).ok(),
250    }))
251}
252
253fn stream_event_kind(binding: &ActivatedStreamBinding, body: &JsonValue) -> String {
254    body.get("kind")
255        .and_then(JsonValue::as_str)
256        .or_else(|| body.get("event").and_then(JsonValue::as_str))
257        .or_else(|| body.get("type").and_then(JsonValue::as_str))
258        .map(ToString::to_string)
259        .or_else(|| binding.match_events.first().cloned())
260        .unwrap_or_else(|| "stream.message".to_string())
261}
262
263fn stream_dedupe_key(
264    binding: &ActivatedStreamBinding,
265    raw: &RawInbound,
266    body: &JsonValue,
267) -> String {
268    header_value(&raw.headers, "x-harn-stream-id")
269        .map(ToString::to_string)
270        .or_else(|| stringish(body, &["dedupe_key", "event_id", "id", "key", "message_id"]))
271        .or_else(|| {
272            let stream_name = stringish(body, &["stream", "topic", "subject", "channel", "slot"])
273                .or_else(|| {
274                    stringish(
275                        &binding.stream,
276                        &["stream", "topic", "subject", "channel", "slot"],
277                    )
278                });
279            let offset = stringish(body, &["offset", "sequence", "lsn"]);
280            match (stream_name, offset) {
281                (Some(stream), Some(offset)) => Some(format!("{stream}:{offset}")),
282                _ => None,
283            }
284        })
285        .unwrap_or_else(|| fallback_body_digest(&raw.body))
286}
287
288fn infer_occurred_at(payload: &ProviderPayload) -> Option<OffsetDateTime> {
289    let ProviderPayload::Known(known) = payload else {
290        return None;
291    };
292    let payload = match known {
293        crate::triggers::event::KnownProviderPayload::Kafka(payload)
294        | crate::triggers::event::KnownProviderPayload::Nats(payload)
295        | crate::triggers::event::KnownProviderPayload::Pulsar(payload)
296        | crate::triggers::event::KnownProviderPayload::PostgresCdc(payload)
297        | crate::triggers::event::KnownProviderPayload::Email(payload)
298        | crate::triggers::event::KnownProviderPayload::Websocket(payload) => payload,
299        _ => return None,
300    };
301    payload.timestamp.as_deref().and_then(|timestamp| {
302        OffsetDateTime::parse(timestamp, &time::format_description::well_known::Rfc3339).ok()
303    })
304}
305
306fn stringish(raw: &JsonValue, fields: &[&str]) -> Option<String> {
307    fields.iter().find_map(|field| {
308        let value = raw.get(*field)?;
309        value
310            .as_str()
311            .map(ToString::to_string)
312            .or_else(|| value.as_i64().map(|number| number.to_string()))
313            .or_else(|| value.as_u64().map(|number| number.to_string()))
314    })
315}
316
317fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
318    headers
319        .iter()
320        .find(|(key, _)| key.eq_ignore_ascii_case(name))
321        .map(|(_, value)| value.as_str())
322}
323
324fn fallback_body_digest(body: &[u8]) -> String {
325    let digest = Sha256::digest(body);
326    let mut encoded = String::with_capacity(digest.len() * 2);
327    for byte in digest {
328        encoded.push_str(&format!("{byte:02x}"));
329    }
330    format!("sha256:{encoded}")
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336    use crate::connectors::{RateLimiterFactory, TriggerBinding};
337    use crate::event_log::{install_memory_for_current_thread, reset_active_event_log};
338    use crate::secrets::{
339        RotationHandle, SecretBytes, SecretError, SecretId, SecretMeta, SecretProvider,
340    };
341    use crate::triggers::InboxIndex;
342
343    struct EmptySecretProvider;
344
345    #[async_trait::async_trait]
346    impl SecretProvider for EmptySecretProvider {
347        async fn get(&self, id: &SecretId) -> Result<SecretBytes, SecretError> {
348            Err(SecretError::NotFound {
349                provider: self.namespace().to_string(),
350                id: id.clone(),
351            })
352        }
353
354        async fn put(&self, _id: &SecretId, _value: SecretBytes) -> Result<(), SecretError> {
355            Ok(())
356        }
357
358        async fn rotate(&self, id: &SecretId) -> Result<RotationHandle, SecretError> {
359            Ok(RotationHandle {
360                provider: self.namespace().to_string(),
361                id: id.clone(),
362                from_version: None,
363                to_version: None,
364            })
365        }
366
367        async fn list(&self, _prefix: &SecretId) -> Result<Vec<SecretMeta>, SecretError> {
368            Ok(Vec::new())
369        }
370
371        fn namespace(&self) -> &str {
372            "empty"
373        }
374
375        fn supports_versions(&self) -> bool {
376            false
377        }
378    }
379
380    #[tokio::test]
381    async fn stream_connector_normalizes_json_inbound() {
382        install_memory_for_current_thread(128);
383        let event_log = crate::event_log::active_event_log().expect("event log");
384        let inbox = Arc::new(
385            InboxIndex::new(
386                event_log.clone(),
387                Arc::new(crate::connectors::MetricsRegistry::default()),
388            )
389            .await
390            .expect("inbox"),
391        );
392        let mut connector = StreamConnector::new(ProviderId::from("kafka"), "StreamEventPayload");
393        connector
394            .init(ConnectorCtx {
395                event_log,
396                secrets: Arc::new(EmptySecretProvider),
397                inbox,
398                metrics: Arc::new(crate::connectors::MetricsRegistry::default()),
399                rate_limiter: Arc::new(RateLimiterFactory::default()),
400            })
401            .await
402            .expect("init");
403        connector
404            .activate(&[TriggerBinding {
405                provider: ProviderId::from("kafka"),
406                kind: TriggerKind::from("stream"),
407                binding_id: "quotes".to_string(),
408                dedupe_key: None,
409                dedupe_retention_days: 7,
410                config: json!({
411                    "match": {"events": ["quote.tick"]},
412                    "stream": {"topic": "quotes"}
413                }),
414            }])
415            .await
416            .expect("activate");
417
418        let mut headers = BTreeMap::new();
419        headers.insert("content-type".to_string(), "application/json".to_string());
420        let mut raw = RawInbound::new(
421            "",
422            headers,
423            serde_json::to_vec(&json!({
424                "key": "acct-1",
425                "offset": 42,
426                "value": {"amount": 10}
427            }))
428            .unwrap(),
429        );
430        raw.metadata = json!({"binding_id": "quotes"});
431
432        let event = connector.normalize_inbound(raw).await.expect("event");
433        assert_eq!(event.provider.as_str(), "kafka");
434        assert_eq!(event.kind, "quote.tick");
435        assert_eq!(event.dedupe_key, "acct-1");
436        let ProviderPayload::Known(crate::triggers::event::KnownProviderPayload::Kafka(payload)) =
437            event.provider_payload
438        else {
439            panic!("expected kafka stream payload");
440        };
441        assert_eq!(payload.stream.as_deref(), None);
442        assert_eq!(payload.key.as_deref(), Some("acct-1"));
443        reset_active_event_log();
444    }
445
446    #[tokio::test]
447    async fn stream_connector_pushes_inbound_through_stream_runtime() {
448        install_memory_for_current_thread(128);
449        let event_log = crate::event_log::active_event_log().expect("event log");
450        let inbox = Arc::new(
451            InboxIndex::new(
452                event_log.clone(),
453                Arc::new(crate::connectors::MetricsRegistry::default()),
454            )
455            .await
456            .expect("inbox"),
457        );
458        let mut connector = StreamConnector::new(ProviderId::from("kafka"), "StreamEventPayload");
459        connector
460            .init(ConnectorCtx {
461                event_log: event_log.clone(),
462                secrets: Arc::new(EmptySecretProvider),
463                inbox,
464                metrics: Arc::new(crate::connectors::MetricsRegistry::default()),
465                rate_limiter: Arc::new(RateLimiterFactory::default()),
466            })
467            .await
468            .expect("init");
469        connector
470            .activate(&[TriggerBinding {
471                provider: ProviderId::from("kafka"),
472                kind: TriggerKind::from("stream"),
473                binding_id: "chat".to_string(),
474                dedupe_key: None,
475                dedupe_retention_days: 7,
476                config: json!({
477                    "match": {"events": ["chat.message"]},
478                    "stream": {"topic": "chat"}
479                }),
480            }])
481            .await
482            .expect("activate");
483
484        let mut vm = crate::vm::Vm::new();
485        crate::stdlib::register_vm_stdlib(&mut vm);
486        let dispatcher = crate::triggers::Dispatcher::with_event_log(vm, event_log.clone());
487        let mut runtime = crate::triggers::StreamTriggerRuntime::new(
488            crate::triggers::StreamTriggerConfig {
489                stream_id: "chat".to_string(),
490                window: crate::triggers::StreamWindowConfig::fixed(2),
491                backpressure: crate::triggers::StreamBackpressureConfig::default(),
492                flow: crate::triggers::StreamFlowConfig::default(),
493                gate: None,
494            },
495            event_log.clone(),
496            dispatcher,
497        )
498        .expect("runtime");
499
500        let mut headers = BTreeMap::new();
501        headers.insert("content-type".to_string(), "application/json".to_string());
502        let mut raw = RawInbound::new(
503            "",
504            headers,
505            serde_json::to_vec(&json!({
506                "event": "chat.message",
507                "stream": "chat",
508                "offset": 1,
509                "text": "hello"
510            }))
511            .unwrap(),
512        );
513        raw.metadata = json!({"binding_id": "chat"});
514
515        let outcomes = connector
516            .push_inbound(&mut runtime, raw)
517            .await
518            .expect("push through runtime");
519        assert!(outcomes.is_empty());
520        assert_eq!(runtime.snapshot().pending_events, 1);
521        reset_active_event_log();
522    }
523}