Skip to main content

harn_vm/connectors/
stream.rs

1use std::collections::{BTreeMap, HashMap};
2use std::sync::{Arc, RwLock};
3
4use async_trait::async_trait;
5use serde_json::{json, Value as JsonValue};
6use sha2::{Digest, Sha256};
7use time::OffsetDateTime;
8
9use crate::connectors::{
10    ActivationHandle, ClientError, Connector, ConnectorClient, ConnectorCtx, ConnectorError,
11    ProviderPayloadSchema, RawInbound, TriggerBinding, TriggerKind,
12};
13use crate::triggers::{
14    redact_headers, HeaderRedactionPolicy, ProviderId, ProviderPayload, SignatureStatus, TraceId,
15    TriggerEvent, TriggerEventId,
16};
17
18pub struct StreamConnector {
19    provider_id: ProviderId,
20    kinds: Vec<TriggerKind>,
21    schema_name: String,
22    client: Arc<StreamClient>,
23    state: RwLock<ConnectorState>,
24}
25
26#[derive(Default)]
27struct ConnectorState {
28    ctx: Option<ConnectorCtx>,
29    bindings: HashMap<String, ActivatedStreamBinding>,
30}
31
32#[derive(Clone, Debug)]
33struct ActivatedStreamBinding {
34    match_events: Vec<String>,
35    stream: JsonValue,
36}
37
38#[derive(Default)]
39struct StreamClient;
40
41#[async_trait]
42impl ConnectorClient for StreamClient {
43    async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
44        Err(ClientError::MethodNotFound(format!(
45            "stream connector has no outbound method `{method}`"
46        )))
47    }
48}
49
50impl StreamConnector {
51    pub fn new(provider_id: ProviderId, schema_name: impl Into<String>) -> Self {
52        Self {
53            provider_id,
54            kinds: vec![TriggerKind::from("stream")],
55            schema_name: schema_name.into(),
56            client: Arc::new(StreamClient),
57            state: RwLock::new(ConnectorState::default()),
58        }
59    }
60
61    fn binding_for_raw(&self, raw: &RawInbound) -> Result<ActivatedStreamBinding, ConnectorError> {
62        let state = self.state.read().expect("stream connector state poisoned");
63        let binding = if let Some(binding_id) =
64            raw.metadata.get("binding_id").and_then(JsonValue::as_str)
65        {
66            state.bindings.get(binding_id).cloned().ok_or_else(|| {
67                ConnectorError::Unsupported(format!(
68                    "stream connector has no active binding `{binding_id}`"
69                ))
70            })?
71        } else if state.bindings.len() == 1 {
72            state
73                .bindings
74                .values()
75                .next()
76                .cloned()
77                .expect("checked single binding")
78        } else {
79            return Err(ConnectorError::Unsupported(
80                "stream connector requires raw.metadata.binding_id when multiple bindings are active"
81                    .to_string(),
82            ));
83        };
84        Ok(binding)
85    }
86
87    fn ctx(&self) -> Result<ConnectorCtx, ConnectorError> {
88        self.state
89            .read()
90            .expect("stream connector state poisoned")
91            .ctx
92            .clone()
93            .ok_or_else(|| {
94                ConnectorError::Activation(
95                    "stream connector must be initialized before use".to_string(),
96                )
97            })
98    }
99}
100
101#[async_trait]
102impl Connector for StreamConnector {
103    fn provider_id(&self) -> &ProviderId {
104        &self.provider_id
105    }
106
107    fn kinds(&self) -> &[TriggerKind] {
108        &self.kinds
109    }
110
111    async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
112        self.state
113            .write()
114            .expect("stream connector state poisoned")
115            .ctx = Some(ctx);
116        Ok(())
117    }
118
119    async fn activate(
120        &self,
121        bindings: &[TriggerBinding],
122    ) -> Result<ActivationHandle, ConnectorError> {
123        let mut configured = HashMap::new();
124        for binding in bindings {
125            let activated = ActivatedStreamBinding::from_binding(binding)?;
126            configured.insert(binding.binding_id.clone(), activated);
127        }
128
129        self.state
130            .write()
131            .expect("stream connector state poisoned")
132            .bindings = configured;
133        Ok(ActivationHandle::new(
134            self.provider_id().clone(),
135            bindings.len(),
136        ))
137    }
138
139    async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
140        let _ctx = self.ctx()?;
141        let binding = self.binding_for_raw(&raw)?;
142        let body = normalized_body(&raw)?;
143        let kind = stream_event_kind(&binding, &body);
144        let dedupe_key = stream_dedupe_key(&binding, &raw, &body);
145        let provider_payload =
146            ProviderPayload::normalize(&self.provider_id, &kind, &raw.headers, body)
147                .map_err(|error| ConnectorError::Unsupported(error.to_string()))?;
148        let occurred_at = raw
149            .occurred_at
150            .or_else(|| infer_occurred_at(&provider_payload));
151
152        Ok(TriggerEvent {
153            id: TriggerEventId::new(),
154            provider: self.provider_id.clone(),
155            kind,
156            received_at: raw.received_at,
157            occurred_at,
158            dedupe_key,
159            trace_id: TraceId::new(),
160            tenant_id: raw.tenant_id,
161            headers: redact_headers(&raw.headers, &HeaderRedactionPolicy::default()),
162            batch: None,
163            raw_body: Some(raw.body),
164            provider_payload,
165            signature_status: SignatureStatus::Unsigned,
166            dedupe_claimed: false,
167        })
168    }
169
170    fn payload_schema(&self) -> ProviderPayloadSchema {
171        ProviderPayloadSchema::named(self.schema_name.clone())
172    }
173
174    fn client(&self) -> Arc<dyn ConnectorClient> {
175        self.client.clone()
176    }
177}
178
179impl ActivatedStreamBinding {
180    fn from_binding(binding: &TriggerBinding) -> Result<Self, ConnectorError> {
181        let config = binding.config.as_object().ok_or_else(|| {
182            ConnectorError::Activation(format!(
183                "stream binding '{}' config must be an object",
184                binding.binding_id
185            ))
186        })?;
187        let match_events = config
188            .get("match")
189            .and_then(|value| value.get("events"))
190            .and_then(JsonValue::as_array)
191            .map(|events| {
192                events
193                    .iter()
194                    .filter_map(JsonValue::as_str)
195                    .map(ToString::to_string)
196                    .collect::<Vec<_>>()
197            })
198            .unwrap_or_default();
199        let stream = config.get("stream").cloned().unwrap_or(JsonValue::Null);
200
201        Ok(Self {
202            match_events,
203            stream,
204        })
205    }
206}
207
208fn normalized_body(raw: &RawInbound) -> Result<JsonValue, ConnectorError> {
209    let content_type = header_value(&raw.headers, "content-type").unwrap_or_default();
210    if content_type.contains("json") {
211        return raw.json_body();
212    }
213    if let Ok(value) = serde_json::from_slice(&raw.body) {
214        return Ok(value);
215    }
216    use base64::Engine;
217    Ok(json!({
218        "raw_base64": base64::engine::general_purpose::STANDARD.encode(&raw.body),
219        "raw_utf8": std::str::from_utf8(&raw.body).ok(),
220    }))
221}
222
223fn stream_event_kind(binding: &ActivatedStreamBinding, body: &JsonValue) -> String {
224    body.get("kind")
225        .and_then(JsonValue::as_str)
226        .or_else(|| body.get("event").and_then(JsonValue::as_str))
227        .or_else(|| body.get("type").and_then(JsonValue::as_str))
228        .map(ToString::to_string)
229        .or_else(|| binding.match_events.first().cloned())
230        .unwrap_or_else(|| "stream.message".to_string())
231}
232
233fn stream_dedupe_key(
234    binding: &ActivatedStreamBinding,
235    raw: &RawInbound,
236    body: &JsonValue,
237) -> String {
238    header_value(&raw.headers, "x-harn-stream-id")
239        .map(ToString::to_string)
240        .or_else(|| stringish(body, &["dedupe_key", "event_id", "id", "key", "message_id"]))
241        .or_else(|| {
242            let stream_name = stringish(body, &["stream", "topic", "subject", "channel", "slot"])
243                .or_else(|| {
244                    stringish(
245                        &binding.stream,
246                        &["stream", "topic", "subject", "channel", "slot"],
247                    )
248                });
249            let offset = stringish(body, &["offset", "sequence", "lsn"]);
250            match (stream_name, offset) {
251                (Some(stream), Some(offset)) => Some(format!("{stream}:{offset}")),
252                _ => None,
253            }
254        })
255        .unwrap_or_else(|| fallback_body_digest(&raw.body))
256}
257
258fn infer_occurred_at(payload: &ProviderPayload) -> Option<OffsetDateTime> {
259    let ProviderPayload::Known(known) = payload else {
260        return None;
261    };
262    let payload = match known {
263        crate::triggers::event::KnownProviderPayload::Kafka(payload)
264        | crate::triggers::event::KnownProviderPayload::Nats(payload)
265        | crate::triggers::event::KnownProviderPayload::Pulsar(payload)
266        | crate::triggers::event::KnownProviderPayload::PostgresCdc(payload)
267        | crate::triggers::event::KnownProviderPayload::Email(payload)
268        | crate::triggers::event::KnownProviderPayload::Websocket(payload) => payload,
269        _ => return None,
270    };
271    payload.timestamp.as_deref().and_then(|timestamp| {
272        OffsetDateTime::parse(timestamp, &time::format_description::well_known::Rfc3339).ok()
273    })
274}
275
276fn stringish(raw: &JsonValue, fields: &[&str]) -> Option<String> {
277    fields.iter().find_map(|field| {
278        let value = raw.get(*field)?;
279        value
280            .as_str()
281            .map(ToString::to_string)
282            .or_else(|| value.as_i64().map(|number| number.to_string()))
283            .or_else(|| value.as_u64().map(|number| number.to_string()))
284    })
285}
286
287fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
288    headers
289        .iter()
290        .find(|(key, _)| key.eq_ignore_ascii_case(name))
291        .map(|(_, value)| value.as_str())
292}
293
294fn fallback_body_digest(body: &[u8]) -> String {
295    let digest = Sha256::digest(body);
296    let mut encoded = String::with_capacity(digest.len() * 2);
297    for byte in digest {
298        encoded.push_str(&format!("{byte:02x}"));
299    }
300    format!("sha256:{encoded}")
301}
302
303#[cfg(test)]
304mod tests {
305    use super::*;
306    use crate::connectors::{RateLimiterFactory, TriggerBinding};
307    use crate::event_log::{install_memory_for_current_thread, reset_active_event_log};
308    use crate::secrets::{
309        RotationHandle, SecretBytes, SecretError, SecretId, SecretMeta, SecretProvider,
310    };
311    use crate::triggers::InboxIndex;
312
313    struct EmptySecretProvider;
314
315    #[async_trait::async_trait]
316    impl SecretProvider for EmptySecretProvider {
317        async fn get(&self, id: &SecretId) -> Result<SecretBytes, SecretError> {
318            Err(SecretError::NotFound {
319                provider: self.namespace().to_string(),
320                id: id.clone(),
321            })
322        }
323
324        async fn put(&self, _id: &SecretId, _value: SecretBytes) -> Result<(), SecretError> {
325            Ok(())
326        }
327
328        async fn rotate(&self, id: &SecretId) -> Result<RotationHandle, SecretError> {
329            Ok(RotationHandle {
330                provider: self.namespace().to_string(),
331                id: id.clone(),
332                from_version: None,
333                to_version: None,
334            })
335        }
336
337        async fn list(&self, _prefix: &SecretId) -> Result<Vec<SecretMeta>, SecretError> {
338            Ok(Vec::new())
339        }
340
341        fn namespace(&self) -> &str {
342            "empty"
343        }
344
345        fn supports_versions(&self) -> bool {
346            false
347        }
348    }
349
350    #[tokio::test]
351    async fn stream_connector_normalizes_json_inbound() {
352        install_memory_for_current_thread(128);
353        let event_log = crate::event_log::active_event_log().expect("event log");
354        let inbox = Arc::new(
355            InboxIndex::new(
356                event_log.clone(),
357                Arc::new(crate::connectors::MetricsRegistry::default()),
358            )
359            .await
360            .expect("inbox"),
361        );
362        let mut connector = StreamConnector::new(ProviderId::from("kafka"), "StreamEventPayload");
363        connector
364            .init(ConnectorCtx {
365                event_log,
366                secrets: Arc::new(EmptySecretProvider),
367                inbox,
368                metrics: Arc::new(crate::connectors::MetricsRegistry::default()),
369                rate_limiter: Arc::new(RateLimiterFactory::default()),
370            })
371            .await
372            .expect("init");
373        connector
374            .activate(&[TriggerBinding {
375                provider: ProviderId::from("kafka"),
376                kind: TriggerKind::from("stream"),
377                binding_id: "quotes".to_string(),
378                dedupe_key: None,
379                dedupe_retention_days: 7,
380                config: json!({
381                    "match": {"events": ["quote.tick"]},
382                    "stream": {"topic": "quotes"}
383                }),
384            }])
385            .await
386            .expect("activate");
387
388        let mut headers = BTreeMap::new();
389        headers.insert("content-type".to_string(), "application/json".to_string());
390        let mut raw = RawInbound::new(
391            "",
392            headers,
393            serde_json::to_vec(&json!({
394                "key": "acct-1",
395                "offset": 42,
396                "value": {"amount": 10}
397            }))
398            .unwrap(),
399        );
400        raw.metadata = json!({"binding_id": "quotes"});
401
402        let event = connector.normalize_inbound(raw).await.expect("event");
403        assert_eq!(event.provider.as_str(), "kafka");
404        assert_eq!(event.kind, "quote.tick");
405        assert_eq!(event.dedupe_key, "acct-1");
406        let ProviderPayload::Known(crate::triggers::event::KnownProviderPayload::Kafka(payload)) =
407            event.provider_payload
408        else {
409            panic!("expected kafka stream payload");
410        };
411        assert_eq!(payload.stream.as_deref(), None);
412        assert_eq!(payload.key.as_deref(), Some("acct-1"));
413        reset_active_event_log();
414    }
415}