1use std::collections::{BTreeMap, HashMap};
2use std::sync::{Arc, RwLock};
3
4use async_trait::async_trait;
5use serde_json::{json, Value as JsonValue};
6use sha2::{Digest, Sha256};
7use time::OffsetDateTime;
8
9use crate::connectors::{
10 ActivationHandle, ClientError, Connector, ConnectorClient, ConnectorCtx, ConnectorError,
11 ProviderPayloadSchema, RawInbound, TriggerBinding, TriggerKind,
12};
13use crate::triggers::{
14 redact_headers, HeaderRedactionPolicy, ProviderId, ProviderPayload, SignatureStatus, TraceId,
15 TriggerEvent, TriggerEventId,
16};
17
18pub struct StreamConnector {
19 provider_id: ProviderId,
20 kinds: Vec<TriggerKind>,
21 schema_name: String,
22 client: Arc<StreamClient>,
23 state: RwLock<ConnectorState>,
24}
25
26#[derive(Default)]
27struct ConnectorState {
28 ctx: Option<ConnectorCtx>,
29 bindings: HashMap<String, ActivatedStreamBinding>,
30}
31
32#[derive(Clone, Debug)]
33struct ActivatedStreamBinding {
34 match_events: Vec<String>,
35 stream: JsonValue,
36}
37
38#[derive(Default)]
39struct StreamClient;
40
41#[async_trait]
42impl ConnectorClient for StreamClient {
43 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
44 Err(ClientError::MethodNotFound(format!(
45 "stream connector has no outbound method `{method}`"
46 )))
47 }
48}
49
50impl StreamConnector {
51 pub fn new(provider_id: ProviderId, schema_name: impl Into<String>) -> Self {
52 Self {
53 provider_id,
54 kinds: vec![TriggerKind::from("stream")],
55 schema_name: schema_name.into(),
56 client: Arc::new(StreamClient),
57 state: RwLock::new(ConnectorState::default()),
58 }
59 }
60
61 fn binding_for_raw(&self, raw: &RawInbound) -> Result<ActivatedStreamBinding, ConnectorError> {
62 let state = self.state.read().expect("stream connector state poisoned");
63 let binding = if let Some(binding_id) =
64 raw.metadata.get("binding_id").and_then(JsonValue::as_str)
65 {
66 state.bindings.get(binding_id).cloned().ok_or_else(|| {
67 ConnectorError::Unsupported(format!(
68 "stream connector has no active binding `{binding_id}`"
69 ))
70 })?
71 } else if state.bindings.len() == 1 {
72 state
73 .bindings
74 .values()
75 .next()
76 .cloned()
77 .expect("checked single binding")
78 } else {
79 return Err(ConnectorError::Unsupported(
80 "stream connector requires raw.metadata.binding_id when multiple bindings are active"
81 .to_string(),
82 ));
83 };
84 Ok(binding)
85 }
86
87 fn ctx(&self) -> Result<ConnectorCtx, ConnectorError> {
88 self.state
89 .read()
90 .expect("stream connector state poisoned")
91 .ctx
92 .clone()
93 .ok_or_else(|| {
94 ConnectorError::Activation(
95 "stream connector must be initialized before use".to_string(),
96 )
97 })
98 }
99}
100
101#[async_trait]
102impl Connector for StreamConnector {
103 fn provider_id(&self) -> &ProviderId {
104 &self.provider_id
105 }
106
107 fn kinds(&self) -> &[TriggerKind] {
108 &self.kinds
109 }
110
111 async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
112 self.state
113 .write()
114 .expect("stream connector state poisoned")
115 .ctx = Some(ctx);
116 Ok(())
117 }
118
119 async fn activate(
120 &self,
121 bindings: &[TriggerBinding],
122 ) -> Result<ActivationHandle, ConnectorError> {
123 let mut configured = HashMap::new();
124 for binding in bindings {
125 let activated = ActivatedStreamBinding::from_binding(binding)?;
126 configured.insert(binding.binding_id.clone(), activated);
127 }
128
129 self.state
130 .write()
131 .expect("stream connector state poisoned")
132 .bindings = configured;
133 Ok(ActivationHandle::new(
134 self.provider_id().clone(),
135 bindings.len(),
136 ))
137 }
138
139 async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
140 let _ctx = self.ctx()?;
141 let binding = self.binding_for_raw(&raw)?;
142 let body = normalized_body(&raw)?;
143 let kind = stream_event_kind(&binding, &body);
144 let dedupe_key = stream_dedupe_key(&binding, &raw, &body);
145 let provider_payload =
146 ProviderPayload::normalize(&self.provider_id, &kind, &raw.headers, body)
147 .map_err(|error| ConnectorError::Unsupported(error.to_string()))?;
148 let occurred_at = raw
149 .occurred_at
150 .or_else(|| infer_occurred_at(&provider_payload));
151
152 Ok(TriggerEvent {
153 id: TriggerEventId::new(),
154 provider: self.provider_id.clone(),
155 kind,
156 received_at: raw.received_at,
157 occurred_at,
158 dedupe_key,
159 trace_id: TraceId::new(),
160 tenant_id: raw.tenant_id,
161 headers: redact_headers(&raw.headers, &HeaderRedactionPolicy::default()),
162 batch: None,
163 raw_body: Some(raw.body),
164 provider_payload,
165 signature_status: SignatureStatus::Unsigned,
166 dedupe_claimed: false,
167 })
168 }
169
170 fn payload_schema(&self) -> ProviderPayloadSchema {
171 ProviderPayloadSchema::named(self.schema_name.clone())
172 }
173
174 fn client(&self) -> Arc<dyn ConnectorClient> {
175 self.client.clone()
176 }
177}
178
179impl ActivatedStreamBinding {
180 fn from_binding(binding: &TriggerBinding) -> Result<Self, ConnectorError> {
181 let config = binding.config.as_object().ok_or_else(|| {
182 ConnectorError::Activation(format!(
183 "stream binding '{}' config must be an object",
184 binding.binding_id
185 ))
186 })?;
187 let match_events = config
188 .get("match")
189 .and_then(|value| value.get("events"))
190 .and_then(JsonValue::as_array)
191 .map(|events| {
192 events
193 .iter()
194 .filter_map(JsonValue::as_str)
195 .map(ToString::to_string)
196 .collect::<Vec<_>>()
197 })
198 .unwrap_or_default();
199 let stream = config.get("stream").cloned().unwrap_or(JsonValue::Null);
200
201 Ok(Self {
202 match_events,
203 stream,
204 })
205 }
206}
207
208fn normalized_body(raw: &RawInbound) -> Result<JsonValue, ConnectorError> {
209 let content_type = header_value(&raw.headers, "content-type").unwrap_or_default();
210 if content_type.contains("json") {
211 return raw.json_body();
212 }
213 if let Ok(value) = serde_json::from_slice(&raw.body) {
214 return Ok(value);
215 }
216 use base64::Engine;
217 Ok(json!({
218 "raw_base64": base64::engine::general_purpose::STANDARD.encode(&raw.body),
219 "raw_utf8": std::str::from_utf8(&raw.body).ok(),
220 }))
221}
222
223fn stream_event_kind(binding: &ActivatedStreamBinding, body: &JsonValue) -> String {
224 body.get("kind")
225 .and_then(JsonValue::as_str)
226 .or_else(|| body.get("event").and_then(JsonValue::as_str))
227 .or_else(|| body.get("type").and_then(JsonValue::as_str))
228 .map(ToString::to_string)
229 .or_else(|| binding.match_events.first().cloned())
230 .unwrap_or_else(|| "stream.message".to_string())
231}
232
233fn stream_dedupe_key(
234 binding: &ActivatedStreamBinding,
235 raw: &RawInbound,
236 body: &JsonValue,
237) -> String {
238 header_value(&raw.headers, "x-harn-stream-id")
239 .map(ToString::to_string)
240 .or_else(|| stringish(body, &["dedupe_key", "event_id", "id", "key", "message_id"]))
241 .or_else(|| {
242 let stream_name = stringish(body, &["stream", "topic", "subject", "channel", "slot"])
243 .or_else(|| {
244 stringish(
245 &binding.stream,
246 &["stream", "topic", "subject", "channel", "slot"],
247 )
248 });
249 let offset = stringish(body, &["offset", "sequence", "lsn"]);
250 match (stream_name, offset) {
251 (Some(stream), Some(offset)) => Some(format!("{stream}:{offset}")),
252 _ => None,
253 }
254 })
255 .unwrap_or_else(|| fallback_body_digest(&raw.body))
256}
257
258fn infer_occurred_at(payload: &ProviderPayload) -> Option<OffsetDateTime> {
259 let ProviderPayload::Known(known) = payload else {
260 return None;
261 };
262 let payload = match known {
263 crate::triggers::event::KnownProviderPayload::Kafka(payload)
264 | crate::triggers::event::KnownProviderPayload::Nats(payload)
265 | crate::triggers::event::KnownProviderPayload::Pulsar(payload)
266 | crate::triggers::event::KnownProviderPayload::PostgresCdc(payload)
267 | crate::triggers::event::KnownProviderPayload::Email(payload)
268 | crate::triggers::event::KnownProviderPayload::Websocket(payload) => payload,
269 _ => return None,
270 };
271 payload.timestamp.as_deref().and_then(|timestamp| {
272 OffsetDateTime::parse(timestamp, &time::format_description::well_known::Rfc3339).ok()
273 })
274}
275
276fn stringish(raw: &JsonValue, fields: &[&str]) -> Option<String> {
277 fields.iter().find_map(|field| {
278 let value = raw.get(*field)?;
279 value
280 .as_str()
281 .map(ToString::to_string)
282 .or_else(|| value.as_i64().map(|number| number.to_string()))
283 .or_else(|| value.as_u64().map(|number| number.to_string()))
284 })
285}
286
287fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
288 headers
289 .iter()
290 .find(|(key, _)| key.eq_ignore_ascii_case(name))
291 .map(|(_, value)| value.as_str())
292}
293
294fn fallback_body_digest(body: &[u8]) -> String {
295 let digest = Sha256::digest(body);
296 let mut encoded = String::with_capacity(digest.len() * 2);
297 for byte in digest {
298 encoded.push_str(&format!("{byte:02x}"));
299 }
300 format!("sha256:{encoded}")
301}
302
303#[cfg(test)]
304mod tests {
305 use super::*;
306 use crate::connectors::{RateLimiterFactory, TriggerBinding};
307 use crate::event_log::{install_memory_for_current_thread, reset_active_event_log};
308 use crate::secrets::{
309 RotationHandle, SecretBytes, SecretError, SecretId, SecretMeta, SecretProvider,
310 };
311 use crate::triggers::InboxIndex;
312
313 struct EmptySecretProvider;
314
315 #[async_trait::async_trait]
316 impl SecretProvider for EmptySecretProvider {
317 async fn get(&self, id: &SecretId) -> Result<SecretBytes, SecretError> {
318 Err(SecretError::NotFound {
319 provider: self.namespace().to_string(),
320 id: id.clone(),
321 })
322 }
323
324 async fn put(&self, _id: &SecretId, _value: SecretBytes) -> Result<(), SecretError> {
325 Ok(())
326 }
327
328 async fn rotate(&self, id: &SecretId) -> Result<RotationHandle, SecretError> {
329 Ok(RotationHandle {
330 provider: self.namespace().to_string(),
331 id: id.clone(),
332 from_version: None,
333 to_version: None,
334 })
335 }
336
337 async fn list(&self, _prefix: &SecretId) -> Result<Vec<SecretMeta>, SecretError> {
338 Ok(Vec::new())
339 }
340
341 fn namespace(&self) -> &str {
342 "empty"
343 }
344
345 fn supports_versions(&self) -> bool {
346 false
347 }
348 }
349
350 #[tokio::test]
351 async fn stream_connector_normalizes_json_inbound() {
352 install_memory_for_current_thread(128);
353 let event_log = crate::event_log::active_event_log().expect("event log");
354 let inbox = Arc::new(
355 InboxIndex::new(
356 event_log.clone(),
357 Arc::new(crate::connectors::MetricsRegistry::default()),
358 )
359 .await
360 .expect("inbox"),
361 );
362 let mut connector = StreamConnector::new(ProviderId::from("kafka"), "StreamEventPayload");
363 connector
364 .init(ConnectorCtx {
365 event_log,
366 secrets: Arc::new(EmptySecretProvider),
367 inbox,
368 metrics: Arc::new(crate::connectors::MetricsRegistry::default()),
369 rate_limiter: Arc::new(RateLimiterFactory::default()),
370 })
371 .await
372 .expect("init");
373 connector
374 .activate(&[TriggerBinding {
375 provider: ProviderId::from("kafka"),
376 kind: TriggerKind::from("stream"),
377 binding_id: "quotes".to_string(),
378 dedupe_key: None,
379 dedupe_retention_days: 7,
380 config: json!({
381 "match": {"events": ["quote.tick"]},
382 "stream": {"topic": "quotes"}
383 }),
384 }])
385 .await
386 .expect("activate");
387
388 let mut headers = BTreeMap::new();
389 headers.insert("content-type".to_string(), "application/json".to_string());
390 let mut raw = RawInbound::new(
391 "",
392 headers,
393 serde_json::to_vec(&json!({
394 "key": "acct-1",
395 "offset": 42,
396 "value": {"amount": 10}
397 }))
398 .unwrap(),
399 );
400 raw.metadata = json!({"binding_id": "quotes"});
401
402 let event = connector.normalize_inbound(raw).await.expect("event");
403 assert_eq!(event.provider.as_str(), "kafka");
404 assert_eq!(event.kind, "quote.tick");
405 assert_eq!(event.dedupe_key, "acct-1");
406 let ProviderPayload::Known(crate::triggers::event::KnownProviderPayload::Kafka(payload)) =
407 event.provider_payload
408 else {
409 panic!("expected kafka stream payload");
410 };
411 assert_eq!(payload.stream.as_deref(), None);
412 assert_eq!(payload.key.as_deref(), Some("acct-1"));
413 reset_active_event_log();
414 }
415}