1use std::collections::{BTreeMap, HashMap};
2use std::sync::{Arc, RwLock};
3
4use async_trait::async_trait;
5use serde_json::{json, Value as JsonValue};
6use sha2::{Digest, Sha256};
7use time::OffsetDateTime;
8
9use crate::connectors::{
10 ActivationHandle, ClientError, Connector, ConnectorClient, ConnectorCtx, ConnectorError,
11 ProviderPayloadSchema, RawInbound, TriggerBinding, TriggerKind,
12};
13use crate::triggers::{
14 redact_headers, HeaderRedactionPolicy, ProviderId, ProviderPayload, SignatureStatus,
15 StreamGateOutcome, StreamTriggerRuntime, TraceId, TriggerEvent, TriggerEventId,
16};
17
18pub struct StreamConnector {
19 provider_id: ProviderId,
20 kinds: Vec<TriggerKind>,
21 schema_name: String,
22 client: Arc<StreamClient>,
23 state: RwLock<ConnectorState>,
24}
25
26#[derive(Default)]
27struct ConnectorState {
28 ctx: Option<ConnectorCtx>,
29 bindings: HashMap<String, ActivatedStreamBinding>,
30}
31
32#[derive(Clone, Debug)]
33struct ActivatedStreamBinding {
34 match_events: Vec<String>,
35 stream: JsonValue,
36}
37
38#[derive(Default)]
39struct StreamClient;
40
41#[async_trait]
42impl ConnectorClient for StreamClient {
43 async fn call(&self, method: &str, _args: JsonValue) -> Result<JsonValue, ClientError> {
44 Err(ClientError::MethodNotFound(format!(
45 "stream connector has no outbound method `{method}`"
46 )))
47 }
48}
49
50impl StreamConnector {
51 pub fn new(provider_id: ProviderId, schema_name: impl Into<String>) -> Self {
52 Self {
53 provider_id,
54 kinds: vec![TriggerKind::from("stream")],
55 schema_name: schema_name.into(),
56 client: Arc::new(StreamClient),
57 state: RwLock::new(ConnectorState::default()),
58 }
59 }
60
61 fn binding_for_raw(&self, raw: &RawInbound) -> Result<ActivatedStreamBinding, ConnectorError> {
62 let state = self.state.read().expect("stream connector state poisoned");
63 let binding = if let Some(binding_id) =
64 raw.metadata.get("binding_id").and_then(JsonValue::as_str)
65 {
66 state.bindings.get(binding_id).cloned().ok_or_else(|| {
67 ConnectorError::Unsupported(format!(
68 "stream connector has no active binding `{binding_id}`"
69 ))
70 })?
71 } else if state.bindings.len() == 1 {
72 state
73 .bindings
74 .values()
75 .next()
76 .cloned()
77 .expect("checked single binding")
78 } else {
79 return Err(ConnectorError::Unsupported(
80 "stream connector requires raw.metadata.binding_id when multiple bindings are active"
81 .to_string(),
82 ));
83 };
84 Ok(binding)
85 }
86
87 fn ctx(&self) -> Result<ConnectorCtx, ConnectorError> {
88 self.state
89 .read()
90 .expect("stream connector state poisoned")
91 .ctx
92 .clone()
93 .ok_or_else(|| {
94 ConnectorError::Activation(
95 "stream connector must be initialized before use".to_string(),
96 )
97 })
98 }
99
100 pub async fn push_inbound(
103 &self,
104 runtime: &mut StreamTriggerRuntime,
105 raw: RawInbound,
106 ) -> Result<Vec<crate::triggers::DispatchOutcome>, ConnectorError> {
107 let event = self.normalize_inbound(raw).await?;
108 runtime
109 .push_event(event)
110 .await
111 .map_err(|error| ConnectorError::HarnRuntime(error.to_string()))
112 }
113
114 pub async fn push_inbound_with_gate(
118 &self,
119 runtime: &mut StreamTriggerRuntime,
120 raw: RawInbound,
121 gate: impl Fn(&crate::triggers::StreamWindowEnvelope) -> StreamGateOutcome,
122 ) -> Result<Vec<crate::triggers::DispatchOutcome>, ConnectorError> {
123 let event = self.normalize_inbound(raw).await?;
124 runtime
125 .push_event_with_gate(event, gate)
126 .await
127 .map_err(|error| ConnectorError::HarnRuntime(error.to_string()))
128 }
129}
130
131#[async_trait]
132impl Connector for StreamConnector {
133 fn provider_id(&self) -> &ProviderId {
134 &self.provider_id
135 }
136
137 fn kinds(&self) -> &[TriggerKind] {
138 &self.kinds
139 }
140
141 async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
142 self.state
143 .write()
144 .expect("stream connector state poisoned")
145 .ctx = Some(ctx);
146 Ok(())
147 }
148
149 async fn activate(
150 &self,
151 bindings: &[TriggerBinding],
152 ) -> Result<ActivationHandle, ConnectorError> {
153 let mut configured = HashMap::new();
154 for binding in bindings {
155 let activated = ActivatedStreamBinding::from_binding(binding)?;
156 configured.insert(binding.binding_id.clone(), activated);
157 }
158
159 self.state
160 .write()
161 .expect("stream connector state poisoned")
162 .bindings = configured;
163 Ok(ActivationHandle::new(
164 self.provider_id().clone(),
165 bindings.len(),
166 ))
167 }
168
169 async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
170 let _ctx = self.ctx()?;
171 let binding = self.binding_for_raw(&raw)?;
172 let body = normalized_body(&raw)?;
173 let kind = stream_event_kind(&binding, &body);
174 let dedupe_key = stream_dedupe_key(&binding, &raw, &body);
175 let provider_payload =
176 ProviderPayload::normalize(&self.provider_id, &kind, &raw.headers, body)
177 .map_err(|error| ConnectorError::Unsupported(error.to_string()))?;
178 let occurred_at = raw
179 .occurred_at
180 .or_else(|| infer_occurred_at(&provider_payload));
181
182 Ok(TriggerEvent {
183 id: TriggerEventId::new(),
184 provider: self.provider_id.clone(),
185 kind,
186 received_at: raw.received_at,
187 occurred_at,
188 dedupe_key,
189 trace_id: TraceId::new(),
190 tenant_id: raw.tenant_id,
191 headers: redact_headers(&raw.headers, &HeaderRedactionPolicy::default()),
192 batch: None,
193 raw_body: Some(raw.body),
194 provider_payload,
195 signature_status: SignatureStatus::Unsigned,
196 dedupe_claimed: false,
197 })
198 }
199
200 fn payload_schema(&self) -> ProviderPayloadSchema {
201 ProviderPayloadSchema::named(self.schema_name.clone())
202 }
203
204 fn client(&self) -> Arc<dyn ConnectorClient> {
205 self.client.clone()
206 }
207}
208
209impl ActivatedStreamBinding {
210 fn from_binding(binding: &TriggerBinding) -> Result<Self, ConnectorError> {
211 let config = binding.config.as_object().ok_or_else(|| {
212 ConnectorError::Activation(format!(
213 "stream binding '{}' config must be an object",
214 binding.binding_id
215 ))
216 })?;
217 let match_events = config
218 .get("match")
219 .and_then(|value| value.get("events"))
220 .and_then(JsonValue::as_array)
221 .map(|events| {
222 events
223 .iter()
224 .filter_map(JsonValue::as_str)
225 .map(ToString::to_string)
226 .collect::<Vec<_>>()
227 })
228 .unwrap_or_default();
229 let stream = config.get("stream").cloned().unwrap_or(JsonValue::Null);
230
231 Ok(Self {
232 match_events,
233 stream,
234 })
235 }
236}
237
238fn normalized_body(raw: &RawInbound) -> Result<JsonValue, ConnectorError> {
239 let content_type = header_value(&raw.headers, "content-type").unwrap_or_default();
240 if content_type.contains("json") {
241 return raw.json_body();
242 }
243 if let Ok(value) = serde_json::from_slice(&raw.body) {
244 return Ok(value);
245 }
246 use base64::Engine;
247 Ok(json!({
248 "raw_base64": base64::engine::general_purpose::STANDARD.encode(&raw.body),
249 "raw_utf8": std::str::from_utf8(&raw.body).ok(),
250 }))
251}
252
253fn stream_event_kind(binding: &ActivatedStreamBinding, body: &JsonValue) -> String {
254 body.get("kind")
255 .and_then(JsonValue::as_str)
256 .or_else(|| body.get("event").and_then(JsonValue::as_str))
257 .or_else(|| body.get("type").and_then(JsonValue::as_str))
258 .map(ToString::to_string)
259 .or_else(|| binding.match_events.first().cloned())
260 .unwrap_or_else(|| "stream.message".to_string())
261}
262
263fn stream_dedupe_key(
264 binding: &ActivatedStreamBinding,
265 raw: &RawInbound,
266 body: &JsonValue,
267) -> String {
268 header_value(&raw.headers, "x-harn-stream-id")
269 .map(ToString::to_string)
270 .or_else(|| stringish(body, &["dedupe_key", "event_id", "id", "key", "message_id"]))
271 .or_else(|| {
272 let stream_name = stringish(body, &["stream", "topic", "subject", "channel", "slot"])
273 .or_else(|| {
274 stringish(
275 &binding.stream,
276 &["stream", "topic", "subject", "channel", "slot"],
277 )
278 });
279 let offset = stringish(body, &["offset", "sequence", "lsn"]);
280 match (stream_name, offset) {
281 (Some(stream), Some(offset)) => Some(format!("{stream}:{offset}")),
282 _ => None,
283 }
284 })
285 .unwrap_or_else(|| fallback_body_digest(&raw.body))
286}
287
288fn infer_occurred_at(payload: &ProviderPayload) -> Option<OffsetDateTime> {
289 let ProviderPayload::Known(known) = payload else {
290 return None;
291 };
292 let payload = match known {
293 crate::triggers::event::KnownProviderPayload::Kafka(payload)
294 | crate::triggers::event::KnownProviderPayload::Nats(payload)
295 | crate::triggers::event::KnownProviderPayload::Pulsar(payload)
296 | crate::triggers::event::KnownProviderPayload::PostgresCdc(payload)
297 | crate::triggers::event::KnownProviderPayload::Email(payload)
298 | crate::triggers::event::KnownProviderPayload::Websocket(payload) => payload,
299 _ => return None,
300 };
301 payload.timestamp.as_deref().and_then(|timestamp| {
302 OffsetDateTime::parse(timestamp, &time::format_description::well_known::Rfc3339).ok()
303 })
304}
305
306fn stringish(raw: &JsonValue, fields: &[&str]) -> Option<String> {
307 fields.iter().find_map(|field| {
308 let value = raw.get(*field)?;
309 value
310 .as_str()
311 .map(ToString::to_string)
312 .or_else(|| value.as_i64().map(|number| number.to_string()))
313 .or_else(|| value.as_u64().map(|number| number.to_string()))
314 })
315}
316
317fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
318 headers
319 .iter()
320 .find(|(key, _)| key.eq_ignore_ascii_case(name))
321 .map(|(_, value)| value.as_str())
322}
323
324fn fallback_body_digest(body: &[u8]) -> String {
325 let digest = Sha256::digest(body);
326 let mut encoded = String::with_capacity(digest.len() * 2);
327 for byte in digest {
328 encoded.push_str(&format!("{byte:02x}"));
329 }
330 format!("sha256:{encoded}")
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336 use crate::connectors::{RateLimiterFactory, TriggerBinding};
337 use crate::event_log::{install_memory_for_current_thread, reset_active_event_log};
338 use crate::secrets::{
339 RotationHandle, SecretBytes, SecretError, SecretId, SecretMeta, SecretProvider,
340 };
341 use crate::triggers::InboxIndex;
342
343 struct EmptySecretProvider;
344
345 #[async_trait::async_trait]
346 impl SecretProvider for EmptySecretProvider {
347 async fn get(&self, id: &SecretId) -> Result<SecretBytes, SecretError> {
348 Err(SecretError::NotFound {
349 provider: self.namespace().to_string(),
350 id: id.clone(),
351 })
352 }
353
354 async fn put(&self, _id: &SecretId, _value: SecretBytes) -> Result<(), SecretError> {
355 Ok(())
356 }
357
358 async fn rotate(&self, id: &SecretId) -> Result<RotationHandle, SecretError> {
359 Ok(RotationHandle {
360 provider: self.namespace().to_string(),
361 id: id.clone(),
362 from_version: None,
363 to_version: None,
364 })
365 }
366
367 async fn list(&self, _prefix: &SecretId) -> Result<Vec<SecretMeta>, SecretError> {
368 Ok(Vec::new())
369 }
370
371 fn namespace(&self) -> &str {
372 "empty"
373 }
374
375 fn supports_versions(&self) -> bool {
376 false
377 }
378 }
379
380 #[tokio::test]
381 async fn stream_connector_normalizes_json_inbound() {
382 install_memory_for_current_thread(128);
383 let event_log = crate::event_log::active_event_log().expect("event log");
384 let inbox = Arc::new(
385 InboxIndex::new(
386 event_log.clone(),
387 Arc::new(crate::connectors::MetricsRegistry::default()),
388 )
389 .await
390 .expect("inbox"),
391 );
392 let mut connector = StreamConnector::new(ProviderId::from("kafka"), "StreamEventPayload");
393 connector
394 .init(ConnectorCtx {
395 event_log,
396 secrets: Arc::new(EmptySecretProvider),
397 inbox,
398 metrics: Arc::new(crate::connectors::MetricsRegistry::default()),
399 rate_limiter: Arc::new(RateLimiterFactory::default()),
400 })
401 .await
402 .expect("init");
403 connector
404 .activate(&[TriggerBinding {
405 provider: ProviderId::from("kafka"),
406 kind: TriggerKind::from("stream"),
407 binding_id: "quotes".to_string(),
408 dedupe_key: None,
409 dedupe_retention_days: 7,
410 config: json!({
411 "match": {"events": ["quote.tick"]},
412 "stream": {"topic": "quotes"}
413 }),
414 }])
415 .await
416 .expect("activate");
417
418 let mut headers = BTreeMap::new();
419 headers.insert("content-type".to_string(), "application/json".to_string());
420 let mut raw = RawInbound::new(
421 "",
422 headers,
423 serde_json::to_vec(&json!({
424 "key": "acct-1",
425 "offset": 42,
426 "value": {"amount": 10}
427 }))
428 .unwrap(),
429 );
430 raw.metadata = json!({"binding_id": "quotes"});
431
432 let event = connector.normalize_inbound(raw).await.expect("event");
433 assert_eq!(event.provider.as_str(), "kafka");
434 assert_eq!(event.kind, "quote.tick");
435 assert_eq!(event.dedupe_key, "acct-1");
436 let ProviderPayload::Known(crate::triggers::event::KnownProviderPayload::Kafka(payload)) =
437 event.provider_payload
438 else {
439 panic!("expected kafka stream payload");
440 };
441 assert_eq!(payload.stream.as_deref(), None);
442 assert_eq!(payload.key.as_deref(), Some("acct-1"));
443 reset_active_event_log();
444 }
445
446 #[tokio::test]
447 async fn stream_connector_pushes_inbound_through_stream_runtime() {
448 install_memory_for_current_thread(128);
449 let event_log = crate::event_log::active_event_log().expect("event log");
450 let inbox = Arc::new(
451 InboxIndex::new(
452 event_log.clone(),
453 Arc::new(crate::connectors::MetricsRegistry::default()),
454 )
455 .await
456 .expect("inbox"),
457 );
458 let mut connector = StreamConnector::new(ProviderId::from("kafka"), "StreamEventPayload");
459 connector
460 .init(ConnectorCtx {
461 event_log: event_log.clone(),
462 secrets: Arc::new(EmptySecretProvider),
463 inbox,
464 metrics: Arc::new(crate::connectors::MetricsRegistry::default()),
465 rate_limiter: Arc::new(RateLimiterFactory::default()),
466 })
467 .await
468 .expect("init");
469 connector
470 .activate(&[TriggerBinding {
471 provider: ProviderId::from("kafka"),
472 kind: TriggerKind::from("stream"),
473 binding_id: "chat".to_string(),
474 dedupe_key: None,
475 dedupe_retention_days: 7,
476 config: json!({
477 "match": {"events": ["chat.message"]},
478 "stream": {"topic": "chat"}
479 }),
480 }])
481 .await
482 .expect("activate");
483
484 let mut vm = crate::vm::Vm::new();
485 crate::stdlib::register_vm_stdlib(&mut vm);
486 let dispatcher = crate::triggers::Dispatcher::with_event_log(vm, event_log.clone());
487 let mut runtime = crate::triggers::StreamTriggerRuntime::new(
488 crate::triggers::StreamTriggerConfig {
489 stream_id: "chat".to_string(),
490 window: crate::triggers::StreamWindowConfig::fixed(2),
491 backpressure: crate::triggers::StreamBackpressureConfig::default(),
492 flow: crate::triggers::StreamFlowConfig::default(),
493 gate: None,
494 },
495 event_log.clone(),
496 dispatcher,
497 )
498 .expect("runtime");
499
500 let mut headers = BTreeMap::new();
501 headers.insert("content-type".to_string(), "application/json".to_string());
502 let mut raw = RawInbound::new(
503 "",
504 headers,
505 serde_json::to_vec(&json!({
506 "event": "chat.message",
507 "stream": "chat",
508 "offset": 1,
509 "text": "hello"
510 }))
511 .unwrap(),
512 );
513 raw.metadata = json!({"binding_id": "chat"});
514
515 let outcomes = connector
516 .push_inbound(&mut runtime, raw)
517 .await
518 .expect("push through runtime");
519 assert!(outcomes.is_empty());
520 assert_eq!(runtime.snapshot().pending_events, 1);
521 reset_active_event_log();
522 }
523}