1use std::collections::{BTreeMap, VecDeque};
2use std::time::Duration;
3
4use serde::{Deserialize, Serialize};
5use serde_json::{json, Value as JsonValue};
6use time::OffsetDateTime;
7
8use crate::event_log::{AnyEventLog, EventLog, LogEvent, Topic};
9use crate::triggers::test_util::clock;
10use crate::triggers::{
11 Dispatcher, ProviderPayload, SignatureStatus, TraceId, TriggerEvent, TriggerEventId,
12};
13
14use super::{DispatchError, DispatchOutcome, TRIGGERS_LIFECYCLE_TOPIC, TRIGGER_DLQ_TOPIC};
15
16pub const TRIGGER_STREAM_WINDOWS_TOPIC: &str = "trigger.stream.windows";
17pub const TRIGGER_STREAM_STATUS_TOPIC: &str = "trigger.stream.status";
18pub const TRIGGER_STREAM_GATE_TOPIC: &str = "trigger.stream.gates";
19
20#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
21#[serde(rename_all = "snake_case")]
22pub enum StreamWindowMode {
23 Fixed,
24 Tumbling,
25 Sliding,
26}
27
28#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
29pub struct StreamWindowConfig {
30 pub mode: StreamWindowMode,
31 pub size: usize,
32 #[serde(default = "default_window_step")]
33 pub step: usize,
34}
35
36impl StreamWindowConfig {
37 pub fn fixed(size: usize) -> Self {
38 Self {
39 mode: StreamWindowMode::Fixed,
40 size,
41 step: size.max(1),
42 }
43 }
44
45 pub fn tumbling(size: usize) -> Self {
46 Self {
47 mode: StreamWindowMode::Tumbling,
48 size,
49 step: size.max(1),
50 }
51 }
52
53 pub fn sliding(size: usize, step: usize) -> Self {
54 Self {
55 mode: StreamWindowMode::Sliding,
56 size,
57 step: step.max(1),
58 }
59 }
60
61 fn validate(&self) -> Result<(), DispatchError> {
62 if self.size == 0 {
63 return Err(DispatchError::Local(
64 "stream window size must be positive".to_string(),
65 ));
66 }
67 if self.step == 0 {
68 return Err(DispatchError::Local(
69 "stream window step must be positive".to_string(),
70 ));
71 }
72 Ok(())
73 }
74
75 fn drain_after_emit(&self, pending: &mut VecDeque<TriggerEvent>) {
76 match self.mode {
77 StreamWindowMode::Fixed | StreamWindowMode::Tumbling => pending.clear(),
78 StreamWindowMode::Sliding => {
79 let remove = self.step.min(pending.len());
80 pending.drain(..remove);
81 }
82 }
83 }
84}
85
86#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
87#[serde(rename_all = "snake_case")]
88pub enum StreamOverflowPolicy {
89 DropNewest,
90 DropOldest,
91 DeadLetterNewest,
92}
93
94#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
95pub struct StreamBackpressureConfig {
96 pub max_pending_events: usize,
97 pub overflow: StreamOverflowPolicy,
98}
99
100impl Default for StreamBackpressureConfig {
101 fn default() -> Self {
102 Self {
103 max_pending_events: 1024,
104 overflow: StreamOverflowPolicy::DeadLetterNewest,
105 }
106 }
107}
108
109#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
110pub struct StreamThrottleConfig {
111 pub max: usize,
112 #[serde(with = "duration_millis")]
113 pub period: Duration,
114}
115
116#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
117pub struct StreamFlowConfig {
118 pub debounce_by_dedupe_key: bool,
119 pub throttle: Option<StreamThrottleConfig>,
120}
121
122#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
123pub struct StreamGateConfig {
124 pub gate_id: String,
125 pub cache_key: String,
126 pub replay_of_event_id: Option<String>,
127}
128
129#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
130pub struct StreamTriggerConfig {
131 pub stream_id: String,
132 pub window: StreamWindowConfig,
133 #[serde(default)]
134 pub backpressure: StreamBackpressureConfig,
135 #[serde(default)]
136 pub flow: StreamFlowConfig,
137 #[serde(default, skip_serializing_if = "Option::is_none")]
138 pub gate: Option<StreamGateConfig>,
139}
140
141impl StreamTriggerConfig {
142 pub fn validate(&self) -> Result<(), DispatchError> {
143 if self.stream_id.trim().is_empty() {
144 return Err(DispatchError::Local(
145 "stream id must be a non-empty string".to_string(),
146 ));
147 }
148 self.window.validate()?;
149 if self.backpressure.max_pending_events == 0 {
150 return Err(DispatchError::Local(
151 "stream max_pending_events must be positive".to_string(),
152 ));
153 }
154 if let Some(throttle) = self.flow.throttle.as_ref() {
155 if throttle.max == 0 || throttle.period.is_zero() {
156 return Err(DispatchError::Local(
157 "stream throttle requires positive max and period".to_string(),
158 ));
159 }
160 }
161 Ok(())
162 }
163}
164
165#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
166pub struct StreamWindowEnvelope {
167 pub stream_id: String,
168 pub window_id: String,
169 pub event_ids: Vec<String>,
170 pub dedupe_keys: Vec<String>,
171 pub first_occurred_at: Option<OffsetDateTime>,
172 pub last_occurred_at: Option<OffsetDateTime>,
173 pub replay_of_event_id: Option<String>,
174}
175
176#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
177pub struct StreamStatusSnapshot {
178 pub stream_id: String,
179 pub pending_events: usize,
180 pub admitted_events: u64,
181 pub dropped_events: u64,
182 pub dead_lettered_events: u64,
183 pub emitted_windows: u64,
184 pub gate_passed: u64,
185 pub gate_blocked: u64,
186}
187
188#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
189pub struct StreamGateRecord {
190 pub gate_id: String,
191 pub cache_key: String,
192 pub window_id: String,
193 pub result: bool,
194 pub cached: bool,
195 pub reason: Option<String>,
196 pub replay_of_event_id: Option<String>,
197}
198
199#[derive(Clone, Debug, PartialEq, Eq)]
200pub enum StreamGateOutcome {
201 Pass { reason: Option<String> },
202 Block { reason: Option<String> },
203}
204
205impl StreamGateOutcome {
206 fn result(&self) -> bool {
207 matches!(self, Self::Pass { .. })
208 }
209
210 fn reason(&self) -> Option<String> {
211 match self {
212 Self::Pass { reason } | Self::Block { reason } => reason.clone(),
213 }
214 }
215}
216
217#[derive(Debug, Default)]
218struct StreamRuntimeState {
219 pending: VecDeque<TriggerEvent>,
220 throttle_hits: VecDeque<OffsetDateTime>,
221 status: StreamStatusSnapshot,
222}
223
224pub struct StreamTriggerRuntime {
225 config: StreamTriggerConfig,
226 event_log: std::sync::Arc<AnyEventLog>,
227 dispatcher: Dispatcher,
228 state: StreamRuntimeState,
229}
230
231impl StreamTriggerRuntime {
232 pub fn new(
233 config: StreamTriggerConfig,
234 event_log: std::sync::Arc<AnyEventLog>,
235 dispatcher: Dispatcher,
236 ) -> Result<Self, DispatchError> {
237 config.validate()?;
238 let status = StreamStatusSnapshot {
239 stream_id: config.stream_id.clone(),
240 ..Default::default()
241 };
242 Ok(Self {
243 config,
244 event_log,
245 dispatcher,
246 state: StreamRuntimeState {
247 status,
248 ..Default::default()
249 },
250 })
251 }
252
253 pub fn snapshot(&self) -> StreamStatusSnapshot {
254 let mut snapshot = self.state.status.clone();
255 snapshot.pending_events = self.state.pending.len();
256 snapshot
257 }
258
259 pub async fn push_event(
260 &mut self,
261 event: TriggerEvent,
262 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
263 self.push_event_with_gate(event, |_| StreamGateOutcome::Pass { reason: None })
264 .await
265 }
266
267 pub async fn push_event_with_gate(
268 &mut self,
269 event: TriggerEvent,
270 gate: impl Fn(&StreamWindowEnvelope) -> StreamGateOutcome,
271 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
272 self.append_status_event(
273 "stream_event_received",
274 json!({
275 "stream_id": self.config.stream_id,
276 "event_id": event.id.0,
277 "dedupe_key": event.dedupe_key,
278 "provider": event.provider.as_str(),
279 "kind": event.kind,
280 }),
281 )
282 .await?;
283
284 if !self.apply_throttle(&event).await? {
285 return Ok(Vec::new());
286 }
287
288 self.admit_event(event).await?;
289 let windows = self.emit_ready_windows().await?;
290 let mut outcomes = Vec::new();
291 for (envelope, window_event) in windows {
292 if !self.evaluate_gate(&envelope, &gate).await? {
293 continue;
294 }
295 outcomes.extend(self.dispatcher.dispatch_event(window_event).await?);
296 }
297 Ok(outcomes)
298 }
299
300 async fn admit_event(&mut self, event: TriggerEvent) -> Result<(), DispatchError> {
301 if self.config.flow.debounce_by_dedupe_key {
302 let before = self.state.pending.len();
303 self.state
304 .pending
305 .retain(|pending| pending.dedupe_key != event.dedupe_key);
306 if self.state.pending.len() != before {
307 self.append_status_event(
308 "stream_event_debounced",
309 json!({
310 "stream_id": self.config.stream_id,
311 "dedupe_key": event.dedupe_key,
312 }),
313 )
314 .await?;
315 }
316 }
317
318 if self.state.pending.len() >= self.config.backpressure.max_pending_events {
319 match self.config.backpressure.overflow {
320 StreamOverflowPolicy::DropNewest => {
321 self.state.status.dropped_events =
322 self.state.status.dropped_events.saturating_add(1);
323 self.append_status_event(
324 "stream_event_dropped",
325 json!({
326 "stream_id": self.config.stream_id,
327 "event_id": event.id.0,
328 "reason": "backpressure_drop_newest",
329 "max_pending_events": self.config.backpressure.max_pending_events,
330 }),
331 )
332 .await?;
333 return Ok(());
334 }
335 StreamOverflowPolicy::DropOldest => {
336 if let Some(dropped) = self.state.pending.pop_front() {
337 self.state.status.dropped_events =
338 self.state.status.dropped_events.saturating_add(1);
339 self.append_status_event(
340 "stream_event_dropped",
341 json!({
342 "stream_id": self.config.stream_id,
343 "event_id": dropped.id.0,
344 "reason": "backpressure_drop_oldest",
345 "max_pending_events": self.config.backpressure.max_pending_events,
346 }),
347 )
348 .await?;
349 }
350 }
351 StreamOverflowPolicy::DeadLetterNewest => {
352 self.dead_letter_event(&event, "backpressure_queue_full")
353 .await?;
354 return Ok(());
355 }
356 }
357 }
358
359 self.state.status.admitted_events = self.state.status.admitted_events.saturating_add(1);
360 self.state.pending.push_back(event);
361 Ok(())
362 }
363
364 async fn emit_ready_windows(
365 &mut self,
366 ) -> Result<Vec<(StreamWindowEnvelope, TriggerEvent)>, DispatchError> {
367 let mut windows = Vec::new();
368 while self.state.pending.len() >= self.config.window.size {
369 let members = self
370 .state
371 .pending
372 .iter()
373 .take(self.config.window.size)
374 .cloned()
375 .collect::<Vec<_>>();
376 let envelope = self.window_envelope(&members);
377 let event = self.window_event(&members, &envelope)?;
378 self.append_window_event(&envelope).await?;
379 self.state.status.emitted_windows = self.state.status.emitted_windows.saturating_add(1);
380 windows.push((envelope, event));
381 self.config.window.drain_after_emit(&mut self.state.pending);
382 }
383 Ok(windows)
384 }
385
386 fn window_envelope(&self, members: &[TriggerEvent]) -> StreamWindowEnvelope {
387 let event_ids = members
388 .iter()
389 .map(|event| event.id.0.clone())
390 .collect::<Vec<_>>();
391 let dedupe_keys = members
392 .iter()
393 .map(|event| event.dedupe_key.clone())
394 .collect::<Vec<_>>();
395 let first_occurred_at = members
396 .iter()
397 .filter_map(|event| event.occurred_at)
398 .min()
399 .or_else(|| members.first().map(|event| event.received_at));
400 let last_occurred_at = members
401 .iter()
402 .filter_map(|event| event.occurred_at)
403 .max()
404 .or_else(|| members.last().map(|event| event.received_at));
405 let source = dedupe_keys.join("+");
406 StreamWindowEnvelope {
407 stream_id: self.config.stream_id.clone(),
408 window_id: format!("stream_window:{}:{source}", self.config.stream_id),
409 event_ids,
410 dedupe_keys,
411 first_occurred_at,
412 last_occurred_at,
413 replay_of_event_id: self
414 .config
415 .gate
416 .as_ref()
417 .and_then(|gate| gate.replay_of_event_id.clone()),
418 }
419 }
420
421 fn window_event(
422 &self,
423 members: &[TriggerEvent],
424 envelope: &StreamWindowEnvelope,
425 ) -> Result<TriggerEvent, DispatchError> {
426 let first = members
427 .first()
428 .ok_or_else(|| DispatchError::Local("stream window cannot be empty".to_string()))?;
429 let mut headers = first.headers.clone();
430 headers.insert("harn_stream_id".to_string(), envelope.stream_id.clone());
431 headers.insert(
432 "harn_stream_window_id".to_string(),
433 envelope.window_id.clone(),
434 );
435 headers.insert(
436 "harn_stream_source_event_ids".to_string(),
437 envelope.event_ids.join(","),
438 );
439 let batch = members
440 .iter()
441 .map(|event| serde_json::to_value(event).map_err(|error| error.to_string()))
442 .collect::<Result<Vec<_>, _>>()
443 .map_err(DispatchError::Serde)?;
444 let mut window_event = first.clone();
445 window_event.id = TriggerEventId::new();
446 window_event.kind = format!("{}.window", first.kind);
447 window_event.dedupe_key = envelope.window_id.clone();
448 window_event.trace_id = TraceId::new();
449 window_event.headers = headers;
450 window_event.batch = Some(batch);
451 Ok(window_event)
452 }
453
454 async fn evaluate_gate(
455 &mut self,
456 envelope: &StreamWindowEnvelope,
457 gate: &impl Fn(&StreamWindowEnvelope) -> StreamGateOutcome,
458 ) -> Result<bool, DispatchError> {
459 let Some(config) = self.config.gate.clone() else {
460 return Ok(true);
461 };
462 let cache_key = format!("{}:{}", config.cache_key, envelope.dedupe_keys.join("+"));
463 if let Some(record) = self.read_gate_record(&config.gate_id, &cache_key).await? {
464 self.append_gate_record(StreamGateRecord {
465 cached: true,
466 ..record.clone()
467 })
468 .await?;
469 if record.result {
470 self.state.status.gate_passed = self.state.status.gate_passed.saturating_add(1);
471 } else {
472 self.state.status.gate_blocked = self.state.status.gate_blocked.saturating_add(1);
473 }
474 return Ok(record.result);
475 }
476
477 let outcome = gate(envelope);
478 let record = StreamGateRecord {
479 gate_id: config.gate_id,
480 cache_key,
481 window_id: envelope.window_id.clone(),
482 result: outcome.result(),
483 cached: false,
484 reason: outcome.reason(),
485 replay_of_event_id: config.replay_of_event_id,
486 };
487 self.append_gate_record(record.clone()).await?;
488 if record.result {
489 self.state.status.gate_passed = self.state.status.gate_passed.saturating_add(1);
490 } else {
491 self.state.status.gate_blocked = self.state.status.gate_blocked.saturating_add(1);
492 self.append_status_event(
493 "stream_window_gate_blocked",
494 json!({
495 "stream_id": self.config.stream_id,
496 "window_id": envelope.window_id,
497 "gate_id": record.gate_id,
498 "reason": record.reason,
499 }),
500 )
501 .await?;
502 }
503 Ok(record.result)
504 }
505
506 async fn apply_throttle(&mut self, event: &TriggerEvent) -> Result<bool, DispatchError> {
507 let Some(throttle) = self.config.flow.throttle.clone() else {
508 return Ok(true);
509 };
510 let now = event.occurred_at.unwrap_or(event.received_at);
511 trim_hits(&mut self.state.throttle_hits, now, throttle.period);
512 if self.state.throttle_hits.len() >= throttle.max {
513 self.state.status.dropped_events = self.state.status.dropped_events.saturating_add(1);
514 self.append_status_event(
515 "stream_event_throttled",
516 json!({
517 "stream_id": self.config.stream_id,
518 "event_id": event.id.0,
519 "max": throttle.max,
520 "period_ms": throttle.period.as_millis(),
521 }),
522 )
523 .await?;
524 return Ok(false);
525 }
526 self.state.throttle_hits.push_back(now);
527 Ok(true)
528 }
529
530 async fn dead_letter_event(
531 &mut self,
532 event: &TriggerEvent,
533 reason: &str,
534 ) -> Result<(), DispatchError> {
535 self.state.status.dead_lettered_events =
536 self.state.status.dead_lettered_events.saturating_add(1);
537 self.append_topic_event(
538 TRIGGER_DLQ_TOPIC,
539 "stream_dead_lettered",
540 json!({
541 "stream_id": self.config.stream_id,
542 "event": event,
543 "reason": reason,
544 "pending_events": self.state.pending.len(),
545 "max_pending_events": self.config.backpressure.max_pending_events,
546 }),
547 )
548 .await?;
549 self.append_status_event(
550 "stream_event_dead_lettered",
551 json!({
552 "stream_id": self.config.stream_id,
553 "event_id": event.id.0,
554 "reason": reason,
555 }),
556 )
557 .await
558 }
559
560 async fn append_window_event(
561 &self,
562 envelope: &StreamWindowEnvelope,
563 ) -> Result<(), DispatchError> {
564 self.append_topic_event(
565 TRIGGER_STREAM_WINDOWS_TOPIC,
566 "stream_window_emitted",
567 serde_json::to_value(envelope)
568 .map_err(|error| DispatchError::Serde(error.to_string()))?,
569 )
570 .await?;
571 self.append_topic_event(
572 TRIGGERS_LIFECYCLE_TOPIC,
573 "StreamWindowEmitted",
574 json!({
575 "stream_id": envelope.stream_id,
576 "window_id": envelope.window_id,
577 "event_ids": envelope.event_ids,
578 "replay_of_event_id": envelope.replay_of_event_id,
579 }),
580 )
581 .await
582 }
583
584 async fn append_gate_record(&self, record: StreamGateRecord) -> Result<(), DispatchError> {
585 self.append_topic_event(
586 TRIGGER_STREAM_GATE_TOPIC,
587 "stream_gate_decision",
588 serde_json::to_value(record)
589 .map_err(|error| DispatchError::Serde(error.to_string()))?,
590 )
591 .await
592 }
593
594 async fn read_gate_record(
595 &self,
596 gate_id: &str,
597 cache_key: &str,
598 ) -> Result<Option<StreamGateRecord>, DispatchError> {
599 let topic =
600 Topic::new(TRIGGER_STREAM_GATE_TOPIC).expect("static stream gate topic is valid");
601 let records = self
602 .event_log
603 .read_range(&topic, None, usize::MAX)
604 .await
605 .map_err(DispatchError::from)?;
606 Ok(records
607 .into_iter()
608 .rev()
609 .filter(|(_, event)| event.kind == "stream_gate_decision")
610 .filter_map(|(_, event)| serde_json::from_value::<StreamGateRecord>(event.payload).ok())
611 .find(|record| record.gate_id == gate_id && record.cache_key == cache_key))
612 }
613
614 async fn append_status_event(
615 &self,
616 kind: &str,
617 mut payload: JsonValue,
618 ) -> Result<(), DispatchError> {
619 if let JsonValue::Object(ref mut object) = payload {
620 object.insert("status".to_string(), json!(self.snapshot()));
621 }
622 self.append_topic_event(TRIGGER_STREAM_STATUS_TOPIC, kind, payload)
623 .await
624 }
625
626 async fn append_topic_event(
627 &self,
628 topic: &str,
629 kind: &str,
630 payload: JsonValue,
631 ) -> Result<(), DispatchError> {
632 self.event_log
633 .append(
634 &Topic::new(topic).expect("static stream trigger topic is valid"),
635 LogEvent::new(kind, payload),
636 )
637 .await
638 .map(|_| ())
639 .map_err(DispatchError::from)
640 }
641}
642
643pub fn stream_window_summary(event: &TriggerEvent) -> Option<StreamWindowEnvelope> {
644 let batch = event.batch.as_ref()?;
645 let event_ids = batch
646 .iter()
647 .filter_map(|member| member.get("id"))
648 .filter_map(JsonValue::as_str)
649 .map(ToString::to_string)
650 .collect::<Vec<_>>();
651 Some(StreamWindowEnvelope {
652 stream_id: event
653 .headers
654 .get("harn_stream_id")
655 .cloned()
656 .unwrap_or_default(),
657 window_id: event
658 .headers
659 .get("harn_stream_window_id")
660 .cloned()
661 .unwrap_or_else(|| event.dedupe_key.clone()),
662 event_ids,
663 dedupe_keys: batch
664 .iter()
665 .filter_map(|member| member.get("dedupe_key"))
666 .filter_map(JsonValue::as_str)
667 .map(ToString::to_string)
668 .collect(),
669 first_occurred_at: None,
670 last_occurred_at: None,
671 replay_of_event_id: None,
672 })
673}
674
675pub fn stream_fixture_event(
676 provider: impl Into<String>,
677 kind: impl Into<String>,
678 stream: impl Into<String>,
679 offset: u64,
680 payload: JsonValue,
681) -> TriggerEvent {
682 let provider = provider.into();
683 let stream = stream.into();
684 let kind = kind.into();
685 let provider_id = crate::triggers::ProviderId::from(provider.clone());
686 let raw = json!({
687 "event": kind,
688 "stream": stream,
689 "offset": offset,
690 "value": payload,
691 });
692 TriggerEvent::new(
693 provider_id.clone(),
694 kind.clone(),
695 Some(clock::now_utc()),
696 format!("{stream}:{offset}"),
697 None,
698 BTreeMap::new(),
699 ProviderPayload::normalize(&provider_id, &kind, &BTreeMap::new(), raw).unwrap_or_else(
700 |_| {
701 ProviderPayload::Known(crate::triggers::event::KnownProviderPayload::Webhook(
702 crate::triggers::GenericWebhookPayload {
703 source: Some("stream_fixture".to_string()),
704 content_type: Some("application/json".to_string()),
705 raw: json!({}),
706 },
707 ))
708 },
709 ),
710 SignatureStatus::Unsigned,
711 )
712}
713
714fn trim_hits(hits: &mut VecDeque<OffsetDateTime>, now: OffsetDateTime, period: Duration) {
715 let period = time::Duration::try_from(period).unwrap_or_default();
716 while hits.front().is_some_and(|hit| now - *hit >= period) {
717 hits.pop_front();
718 }
719}
720
721fn default_window_step() -> usize {
722 1
723}
724
725mod duration_millis {
726 use std::time::Duration;
727
728 use serde::{Deserialize, Deserializer, Serializer};
729
730 pub fn serialize<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
731 where
732 S: Serializer,
733 {
734 serializer.serialize_u64(duration.as_millis() as u64)
735 }
736
737 pub fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
738 where
739 D: Deserializer<'de>,
740 {
741 Ok(Duration::from_millis(u64::deserialize(deserializer)?))
742 }
743}
744
745#[cfg(test)]
746mod tests {
747 use std::cell::Cell;
748 use std::rc::Rc;
749 use std::sync::Arc;
750
751 use serde_json::json;
752
753 use crate::event_log::{install_default_for_base_dir, EventLog};
754 use crate::stdlib::register_vm_stdlib;
755 use crate::triggers::dispatcher::RetryPolicy;
756 use crate::triggers::TriggerRetryConfig;
757 use crate::triggers::{
758 clear_trigger_registry, install_manifest_triggers, ProviderId, TriggerBindingSource,
759 TriggerBindingSpec, TriggerHandlerSpec,
760 };
761 use crate::trust_graph::AutonomyTier;
762 use crate::vm::Vm;
763
764 use super::*;
765
766 async fn read_topic(
767 log: Arc<AnyEventLog>,
768 topic: &str,
769 ) -> Vec<(u64, crate::event_log::LogEvent)> {
770 log.read_range(&Topic::new(topic).unwrap(), None, usize::MAX)
771 .await
772 .unwrap()
773 }
774
775 async fn stream_dispatcher_fixture(
776 source: &str,
777 ) -> (tempfile::TempDir, Arc<AnyEventLog>, Dispatcher) {
778 crate::reset_thread_local_state();
779 clear_trigger_registry();
780 let dir = tempfile::tempdir().expect("tempdir");
781 let log = install_default_for_base_dir(dir.path()).expect("event log");
782 let lib_path = dir.path().join("lib.harn");
783 std::fs::write(&lib_path, source).expect("write harn source");
784
785 let mut vm = Vm::new();
786 register_vm_stdlib(&mut vm);
787 vm.set_source_dir(dir.path());
788 let exports = vm
789 .load_module_exports(&lib_path)
790 .await
791 .expect("load stream handler");
792 let handler = exports.get("local_fn").expect("local_fn export").clone();
793 install_manifest_triggers(vec![TriggerBindingSpec {
794 id: "stream-window-handler".to_string(),
795 source: TriggerBindingSource::Manifest,
796 kind: "stream".to_string(),
797 provider: ProviderId::from("kafka"),
798 autonomy_tier: AutonomyTier::ActAuto,
799 handler: TriggerHandlerSpec::Local {
800 raw: "local_fn".to_string(),
801 closure: handler,
802 },
803 dispatch_priority: crate::triggers::WorkerQueuePriority::Normal,
804 when: None,
805 when_budget: None,
806 retry: TriggerRetryConfig::new(1, RetryPolicy::Linear { delay_ms: 1 }),
807 match_events: vec!["issues.opened.window".to_string()],
808 dedupe_key: Some("event.dedupe_key".to_string()),
809 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
810 filter: None,
811 daily_cost_usd: None,
812 hourly_cost_usd: None,
813 max_autonomous_decisions_per_hour: None,
814 max_autonomous_decisions_per_day: None,
815 on_budget_exhausted: crate::triggers::TriggerBudgetExhaustionStrategy::False,
816 max_concurrent: None,
817 flow_control: crate::triggers::TriggerFlowControlConfig::default(),
818 aggregation: None,
819 manifest_path: None,
820 package_name: Some("workspace".to_string()),
821 definition_fingerprint: "stream-window-handler:v1".to_string(),
822 }])
823 .await
824 .expect("install stream trigger");
825
826 (dir, log.clone(), Dispatcher::with_event_log(vm, log))
827 }
828
829 #[tokio::test(flavor = "current_thread")]
830 async fn chat_stream_windows_gate_and_dispatch_deterministically() {
831 let local = tokio::task::LocalSet::new();
832 local
833 .run_until(async {
834 let (_dir, log, dispatcher) = stream_dispatcher_fixture(
835 r#"
836import "std/triggers"
837
838pub fn local_fn(event: TriggerEvent) -> int {
839 return len(event.batch ?? [])
840}
841"#,
842 )
843 .await;
844
845 let mut runtime = StreamTriggerRuntime::new(
846 StreamTriggerConfig {
847 stream_id: "chat:triage".to_string(),
848 window: StreamWindowConfig::tumbling(2),
849 backpressure: StreamBackpressureConfig::default(),
850 flow: StreamFlowConfig::default(),
851 gate: Some(StreamGateConfig {
852 gate_id: "chat-llm-gate".to_string(),
853 cache_key: "chat:v1".to_string(),
854 replay_of_event_id: None,
855 }),
856 },
857 log.clone(),
858 dispatcher,
859 )
860 .unwrap();
861
862 let calls = Rc::new(Cell::new(0));
863 for offset in 0..2 {
864 let calls = calls.clone();
865 let outcomes = runtime
866 .push_event_with_gate(
867 stream_fixture_event(
868 "kafka",
869 "issues.opened",
870 "chat",
871 offset,
872 json!({"text": format!("message {offset}")}),
873 ),
874 move |_| {
875 calls.set(calls.get() + 1);
876 StreamGateOutcome::Pass {
877 reason: Some("fixture-positive".to_string()),
878 }
879 },
880 )
881 .await
882 .unwrap();
883 if offset == 1 {
884 assert_eq!(outcomes.len(), 1);
885 assert_eq!(outcomes[0].result, Some(json!(2)));
886 } else {
887 assert!(outcomes.is_empty());
888 }
889 }
890 assert_eq!(calls.get(), 1);
891
892 let gate_events = read_topic(log.clone(), TRIGGER_STREAM_GATE_TOPIC).await;
893 assert_eq!(
894 gate_events
895 .iter()
896 .filter(|(_, event)| event.kind == "stream_gate_decision")
897 .count(),
898 1
899 );
900
901 let first_window_id = gate_events[0].1.payload["window_id"]
902 .as_str()
903 .unwrap()
904 .to_string();
905 runtime.config.gate.as_mut().unwrap().replay_of_event_id =
906 Some(first_window_id.clone());
907
908 for offset in 0..2 {
909 let calls = calls.clone();
910 runtime
911 .push_event_with_gate(
912 stream_fixture_event(
913 "kafka",
914 "issues.opened",
915 "chat",
916 offset,
917 json!({"text": format!("message {offset}")}),
918 ),
919 move |_| {
920 calls.set(calls.get() + 1);
921 StreamGateOutcome::Block {
922 reason: Some("should-not-run".to_string()),
923 }
924 },
925 )
926 .await
927 .unwrap();
928 }
929 assert_eq!(calls.get(), 1);
930 let gate_events = read_topic(log.clone(), TRIGGER_STREAM_GATE_TOPIC).await;
931 assert!(gate_events.iter().any(|(_, event)| {
932 event.kind == "stream_gate_decision"
933 && event.payload["cached"] == json!(true)
934 && event.payload["window_id"] == json!(first_window_id)
935 }));
936 })
937 .await;
938 }
939
940 #[tokio::test(flavor = "current_thread")]
941 async fn issue_feed_backpressure_dead_letters_overflow() {
942 let local = tokio::task::LocalSet::new();
943 local
944 .run_until(async {
945 let dir = tempfile::tempdir().unwrap();
946 let log = install_default_for_base_dir(dir.path()).unwrap();
947 let mut vm = Vm::new();
948 register_vm_stdlib(&mut vm);
949 vm.set_source_dir(dir.path());
950 let dispatcher = Dispatcher::with_event_log(vm, log.clone());
951 let mut runtime = StreamTriggerRuntime::new(
952 StreamTriggerConfig {
953 stream_id: "issue-feed".to_string(),
954 window: StreamWindowConfig::fixed(3),
955 backpressure: StreamBackpressureConfig {
956 max_pending_events: 1,
957 overflow: StreamOverflowPolicy::DeadLetterNewest,
958 },
959 flow: StreamFlowConfig::default(),
960 gate: None,
961 },
962 log.clone(),
963 dispatcher,
964 )
965 .unwrap();
966
967 runtime
968 .push_event(stream_fixture_event(
969 "kafka",
970 "issues.opened",
971 "issues",
972 1,
973 json!({"number": 1}),
974 ))
975 .await
976 .unwrap();
977 runtime
978 .push_event(stream_fixture_event(
979 "kafka",
980 "issues.opened",
981 "issues",
982 2,
983 json!({"number": 2}),
984 ))
985 .await
986 .unwrap();
987
988 let snapshot = runtime.snapshot();
989 assert_eq!(snapshot.pending_events, 1);
990 assert_eq!(snapshot.dead_lettered_events, 1);
991
992 let dlq = read_topic(log.clone(), TRIGGER_DLQ_TOPIC).await;
993 assert!(dlq.iter().any(|(_, event)| {
994 event.kind == "stream_dead_lettered"
995 && event.payload["reason"] == json!("backpressure_queue_full")
996 }));
997 })
998 .await;
999 }
1000
1001 #[tokio::test(flavor = "current_thread")]
1002 async fn sliding_windows_emit_overlapping_batches() {
1003 let local = tokio::task::LocalSet::new();
1004 local
1005 .run_until(async {
1006 let (_dir, log, dispatcher) = stream_dispatcher_fixture(
1007 r#"
1008import "std/triggers"
1009
1010pub fn local_fn(event: TriggerEvent) -> int {
1011 return len(event.batch ?? [])
1012}
1013"#,
1014 )
1015 .await;
1016 let mut runtime = StreamTriggerRuntime::new(
1017 StreamTriggerConfig {
1018 stream_id: "sliding".to_string(),
1019 window: StreamWindowConfig::sliding(2, 1),
1020 backpressure: StreamBackpressureConfig::default(),
1021 flow: StreamFlowConfig::default(),
1022 gate: None,
1023 },
1024 log.clone(),
1025 dispatcher,
1026 )
1027 .unwrap();
1028
1029 let mut dispatched = 0;
1030 for offset in 1..=3 {
1031 dispatched += runtime
1032 .push_event(stream_fixture_event(
1033 "kafka",
1034 "issues.opened",
1035 "sliding",
1036 offset,
1037 json!({"offset": offset}),
1038 ))
1039 .await
1040 .unwrap()
1041 .len();
1042 }
1043
1044 assert_eq!(dispatched, 2);
1045 assert_eq!(runtime.snapshot().emitted_windows, 2);
1046 let windows = read_topic(log, TRIGGER_STREAM_WINDOWS_TOPIC).await;
1047 assert_eq!(windows.len(), 2);
1048 })
1049 .await;
1050 }
1051}