1use std::collections::{BTreeMap, VecDeque};
2use std::time::Duration;
3
4use serde::{Deserialize, Serialize};
5use serde_json::{json, Value as JsonValue};
6use time::OffsetDateTime;
7
8use crate::event_log::{AnyEventLog, EventLog, LogEvent, Topic};
9use crate::triggers::test_util::clock;
10use crate::triggers::{
11 Dispatcher, ProviderPayload, SignatureStatus, TraceId, TriggerEvent, TriggerEventId,
12};
13
14use super::{DispatchError, DispatchOutcome, TRIGGERS_LIFECYCLE_TOPIC, TRIGGER_DLQ_TOPIC};
15
16pub const TRIGGER_STREAM_WINDOWS_TOPIC: &str = "trigger.stream.windows";
17pub const TRIGGER_STREAM_STATUS_TOPIC: &str = "trigger.stream.status";
18pub const TRIGGER_STREAM_GATE_TOPIC: &str = "trigger.stream.gates";
19
20#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
21#[serde(rename_all = "snake_case")]
22pub enum StreamWindowMode {
23 Fixed,
24 Tumbling,
25 Sliding,
26}
27
28#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
29pub struct StreamWindowConfig {
30 pub mode: StreamWindowMode,
31 pub size: usize,
32 #[serde(default = "default_window_step")]
33 pub step: usize,
34}
35
36impl StreamWindowConfig {
37 pub fn fixed(size: usize) -> Self {
38 Self {
39 mode: StreamWindowMode::Fixed,
40 size,
41 step: size.max(1),
42 }
43 }
44
45 pub fn tumbling(size: usize) -> Self {
46 Self {
47 mode: StreamWindowMode::Tumbling,
48 size,
49 step: size.max(1),
50 }
51 }
52
53 pub fn sliding(size: usize, step: usize) -> Self {
54 Self {
55 mode: StreamWindowMode::Sliding,
56 size,
57 step: step.max(1),
58 }
59 }
60
61 fn validate(&self) -> Result<(), DispatchError> {
62 if self.size == 0 {
63 return Err(DispatchError::Local(
64 "stream window size must be positive".to_string(),
65 ));
66 }
67 if self.step == 0 {
68 return Err(DispatchError::Local(
69 "stream window step must be positive".to_string(),
70 ));
71 }
72 Ok(())
73 }
74
75 fn drain_after_emit(&self, pending: &mut VecDeque<TriggerEvent>) {
76 match self.mode {
77 StreamWindowMode::Fixed | StreamWindowMode::Tumbling => pending.clear(),
78 StreamWindowMode::Sliding => {
79 let remove = self.step.min(pending.len());
80 pending.drain(..remove);
81 }
82 }
83 }
84}
85
86#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
87#[serde(rename_all = "snake_case")]
88pub enum StreamOverflowPolicy {
89 DropNewest,
90 DropOldest,
91 DeadLetterNewest,
92}
93
94#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
95pub struct StreamBackpressureConfig {
96 pub max_pending_events: usize,
97 pub overflow: StreamOverflowPolicy,
98}
99
100impl Default for StreamBackpressureConfig {
101 fn default() -> Self {
102 Self {
103 max_pending_events: 1024,
104 overflow: StreamOverflowPolicy::DeadLetterNewest,
105 }
106 }
107}
108
109#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
110pub struct StreamThrottleConfig {
111 pub max: usize,
112 #[serde(with = "duration_millis")]
113 pub period: Duration,
114}
115
116#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
117pub struct StreamFlowConfig {
118 pub debounce_by_dedupe_key: bool,
119 pub throttle: Option<StreamThrottleConfig>,
120}
121
122#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
123pub struct StreamGateConfig {
124 pub gate_id: String,
125 pub cache_key: String,
126 pub replay_of_event_id: Option<String>,
127}
128
129#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
130pub struct StreamTriggerConfig {
131 pub stream_id: String,
132 pub window: StreamWindowConfig,
133 #[serde(default)]
134 pub backpressure: StreamBackpressureConfig,
135 #[serde(default)]
136 pub flow: StreamFlowConfig,
137 #[serde(default, skip_serializing_if = "Option::is_none")]
138 pub gate: Option<StreamGateConfig>,
139}
140
141impl StreamTriggerConfig {
142 pub fn validate(&self) -> Result<(), DispatchError> {
143 if self.stream_id.trim().is_empty() {
144 return Err(DispatchError::Local(
145 "stream id must be a non-empty string".to_string(),
146 ));
147 }
148 self.window.validate()?;
149 if self.backpressure.max_pending_events == 0 {
150 return Err(DispatchError::Local(
151 "stream max_pending_events must be positive".to_string(),
152 ));
153 }
154 if let Some(throttle) = self.flow.throttle.as_ref() {
155 if throttle.max == 0 || throttle.period.is_zero() {
156 return Err(DispatchError::Local(
157 "stream throttle requires positive max and period".to_string(),
158 ));
159 }
160 }
161 Ok(())
162 }
163}
164
165#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
166pub struct StreamWindowEnvelope {
167 pub stream_id: String,
168 pub window_id: String,
169 pub event_ids: Vec<String>,
170 pub dedupe_keys: Vec<String>,
171 pub first_occurred_at: Option<OffsetDateTime>,
172 pub last_occurred_at: Option<OffsetDateTime>,
173 pub replay_of_event_id: Option<String>,
174}
175
176#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
177pub struct StreamStatusSnapshot {
178 pub stream_id: String,
179 pub pending_events: usize,
180 pub admitted_events: u64,
181 pub dropped_events: u64,
182 pub dead_lettered_events: u64,
183 pub emitted_windows: u64,
184 pub gate_passed: u64,
185 pub gate_blocked: u64,
186}
187
188#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
189pub struct StreamGateRecord {
190 pub gate_id: String,
191 pub cache_key: String,
192 pub window_id: String,
193 pub result: bool,
194 pub cached: bool,
195 pub reason: Option<String>,
196 pub replay_of_event_id: Option<String>,
197}
198
199#[derive(Clone, Debug, PartialEq, Eq)]
200pub enum StreamGateOutcome {
201 Pass { reason: Option<String> },
202 Block { reason: Option<String> },
203}
204
205impl StreamGateOutcome {
206 fn result(&self) -> bool {
207 matches!(self, Self::Pass { .. })
208 }
209
210 fn reason(&self) -> Option<String> {
211 match self {
212 Self::Pass { reason } | Self::Block { reason } => reason.clone(),
213 }
214 }
215}
216
217#[derive(Debug, Default)]
218struct StreamRuntimeState {
219 pending: VecDeque<TriggerEvent>,
220 throttle_hits: VecDeque<OffsetDateTime>,
221 status: StreamStatusSnapshot,
222}
223
224pub struct StreamTriggerRuntime {
225 config: StreamTriggerConfig,
226 event_log: std::sync::Arc<AnyEventLog>,
227 dispatcher: Dispatcher,
228 state: StreamRuntimeState,
229}
230
231impl StreamTriggerRuntime {
232 pub fn new(
233 config: StreamTriggerConfig,
234 event_log: std::sync::Arc<AnyEventLog>,
235 dispatcher: Dispatcher,
236 ) -> Result<Self, DispatchError> {
237 config.validate()?;
238 let status = StreamStatusSnapshot {
239 stream_id: config.stream_id.clone(),
240 ..Default::default()
241 };
242 Ok(Self {
243 config,
244 event_log,
245 dispatcher,
246 state: StreamRuntimeState {
247 status,
248 ..Default::default()
249 },
250 })
251 }
252
253 pub fn snapshot(&self) -> StreamStatusSnapshot {
254 let mut snapshot = self.state.status.clone();
255 snapshot.pending_events = self.state.pending.len();
256 snapshot
257 }
258
259 pub async fn push_event(
260 &mut self,
261 event: TriggerEvent,
262 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
263 self.push_event_with_gate(event, |_| StreamGateOutcome::Pass { reason: None })
264 .await
265 }
266
267 pub async fn push_event_with_gate(
268 &mut self,
269 event: TriggerEvent,
270 gate: impl Fn(&StreamWindowEnvelope) -> StreamGateOutcome,
271 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
272 self.append_status_event(
273 "stream_event_received",
274 json!({
275 "stream_id": self.config.stream_id,
276 "event_id": event.id.0,
277 "dedupe_key": event.dedupe_key,
278 "provider": event.provider.as_str(),
279 "kind": event.kind,
280 }),
281 )
282 .await?;
283
284 if !self.apply_throttle(&event).await? {
285 return Ok(Vec::new());
286 }
287
288 self.admit_event(event).await?;
289 let windows = self.emit_ready_windows().await?;
290 let mut outcomes = Vec::new();
291 for (envelope, window_event) in windows {
292 if !self.evaluate_gate(&envelope, &gate).await? {
293 continue;
294 }
295 outcomes.extend(self.dispatcher.dispatch_event(window_event).await?);
296 }
297 Ok(outcomes)
298 }
299
300 async fn admit_event(&mut self, event: TriggerEvent) -> Result<(), DispatchError> {
301 if self.config.flow.debounce_by_dedupe_key {
302 let before = self.state.pending.len();
303 self.state
304 .pending
305 .retain(|pending| pending.dedupe_key != event.dedupe_key);
306 if self.state.pending.len() != before {
307 self.append_status_event(
308 "stream_event_debounced",
309 json!({
310 "stream_id": self.config.stream_id,
311 "dedupe_key": event.dedupe_key,
312 }),
313 )
314 .await?;
315 }
316 }
317
318 if self.state.pending.len() >= self.config.backpressure.max_pending_events {
319 match self.config.backpressure.overflow {
320 StreamOverflowPolicy::DropNewest => {
321 self.state.status.dropped_events =
322 self.state.status.dropped_events.saturating_add(1);
323 self.append_status_event(
324 "stream_event_dropped",
325 json!({
326 "stream_id": self.config.stream_id,
327 "event_id": event.id.0,
328 "reason": "backpressure_drop_newest",
329 "max_pending_events": self.config.backpressure.max_pending_events,
330 }),
331 )
332 .await?;
333 return Ok(());
334 }
335 StreamOverflowPolicy::DropOldest => {
336 if let Some(dropped) = self.state.pending.pop_front() {
337 self.state.status.dropped_events =
338 self.state.status.dropped_events.saturating_add(1);
339 self.append_status_event(
340 "stream_event_dropped",
341 json!({
342 "stream_id": self.config.stream_id,
343 "event_id": dropped.id.0,
344 "reason": "backpressure_drop_oldest",
345 "max_pending_events": self.config.backpressure.max_pending_events,
346 }),
347 )
348 .await?;
349 }
350 }
351 StreamOverflowPolicy::DeadLetterNewest => {
352 self.dead_letter_event(&event, "backpressure_queue_full")
353 .await?;
354 return Ok(());
355 }
356 }
357 }
358
359 self.state.status.admitted_events = self.state.status.admitted_events.saturating_add(1);
360 self.state.pending.push_back(event);
361 Ok(())
362 }
363
364 async fn emit_ready_windows(
365 &mut self,
366 ) -> Result<Vec<(StreamWindowEnvelope, TriggerEvent)>, DispatchError> {
367 let mut windows = Vec::new();
368 while self.state.pending.len() >= self.config.window.size {
369 let members = self
370 .state
371 .pending
372 .iter()
373 .take(self.config.window.size)
374 .cloned()
375 .collect::<Vec<_>>();
376 let envelope = self.window_envelope(&members);
377 let event = self.window_event(&members, &envelope)?;
378 self.append_window_event(&envelope).await?;
379 self.state.status.emitted_windows = self.state.status.emitted_windows.saturating_add(1);
380 windows.push((envelope, event));
381 self.config.window.drain_after_emit(&mut self.state.pending);
382 }
383 Ok(windows)
384 }
385
386 fn window_envelope(&self, members: &[TriggerEvent]) -> StreamWindowEnvelope {
387 let event_ids = members
388 .iter()
389 .map(|event| event.id.0.clone())
390 .collect::<Vec<_>>();
391 let dedupe_keys = members
392 .iter()
393 .map(|event| event.dedupe_key.clone())
394 .collect::<Vec<_>>();
395 let first_occurred_at = members
396 .iter()
397 .filter_map(|event| event.occurred_at)
398 .min()
399 .or_else(|| members.first().map(|event| event.received_at));
400 let last_occurred_at = members
401 .iter()
402 .filter_map(|event| event.occurred_at)
403 .max()
404 .or_else(|| members.last().map(|event| event.received_at));
405 let source = dedupe_keys.join("+");
406 StreamWindowEnvelope {
407 stream_id: self.config.stream_id.clone(),
408 window_id: format!("stream_window:{}:{source}", self.config.stream_id),
409 event_ids,
410 dedupe_keys,
411 first_occurred_at,
412 last_occurred_at,
413 replay_of_event_id: self
414 .config
415 .gate
416 .as_ref()
417 .and_then(|gate| gate.replay_of_event_id.clone()),
418 }
419 }
420
421 fn window_event(
422 &self,
423 members: &[TriggerEvent],
424 envelope: &StreamWindowEnvelope,
425 ) -> Result<TriggerEvent, DispatchError> {
426 let first = members
427 .first()
428 .ok_or_else(|| DispatchError::Local("stream window cannot be empty".to_string()))?;
429 let mut headers = first.headers.clone();
430 headers.insert("harn_stream_id".to_string(), envelope.stream_id.clone());
431 headers.insert(
432 "harn_stream_window_id".to_string(),
433 envelope.window_id.clone(),
434 );
435 headers.insert(
436 "harn_stream_source_event_ids".to_string(),
437 envelope.event_ids.join(","),
438 );
439 let batch = members
440 .iter()
441 .map(|event| serde_json::to_value(event).map_err(|error| error.to_string()))
442 .collect::<Result<Vec<_>, _>>()
443 .map_err(DispatchError::Serde)?;
444 let mut window_event = first.clone();
445 window_event.id = TriggerEventId::new();
446 window_event.kind = format!("{}.window", first.kind);
447 window_event.dedupe_key = envelope.window_id.clone();
448 window_event.trace_id = TraceId::new();
449 window_event.headers = headers;
450 window_event.batch = Some(batch);
451 Ok(window_event)
452 }
453
454 async fn evaluate_gate(
455 &mut self,
456 envelope: &StreamWindowEnvelope,
457 gate: &impl Fn(&StreamWindowEnvelope) -> StreamGateOutcome,
458 ) -> Result<bool, DispatchError> {
459 let Some(config) = self.config.gate.clone() else {
460 return Ok(true);
461 };
462 let cache_key = format!("{}:{}", config.cache_key, envelope.dedupe_keys.join("+"));
463 if let Some(record) = self.read_gate_record(&config.gate_id, &cache_key).await? {
464 self.append_gate_record(StreamGateRecord {
465 cached: true,
466 ..record.clone()
467 })
468 .await?;
469 if record.result {
470 self.state.status.gate_passed = self.state.status.gate_passed.saturating_add(1);
471 } else {
472 self.state.status.gate_blocked = self.state.status.gate_blocked.saturating_add(1);
473 }
474 return Ok(record.result);
475 }
476
477 let outcome = gate(envelope);
478 let record = StreamGateRecord {
479 gate_id: config.gate_id,
480 cache_key,
481 window_id: envelope.window_id.clone(),
482 result: outcome.result(),
483 cached: false,
484 reason: outcome.reason(),
485 replay_of_event_id: config.replay_of_event_id,
486 };
487 self.append_gate_record(record.clone()).await?;
488 if record.result {
489 self.state.status.gate_passed = self.state.status.gate_passed.saturating_add(1);
490 } else {
491 self.state.status.gate_blocked = self.state.status.gate_blocked.saturating_add(1);
492 self.append_status_event(
493 "stream_window_gate_blocked",
494 json!({
495 "stream_id": self.config.stream_id,
496 "window_id": envelope.window_id,
497 "gate_id": record.gate_id,
498 "reason": record.reason,
499 }),
500 )
501 .await?;
502 }
503 Ok(record.result)
504 }
505
506 async fn apply_throttle(&mut self, event: &TriggerEvent) -> Result<bool, DispatchError> {
507 let Some(throttle) = self.config.flow.throttle.clone() else {
508 return Ok(true);
509 };
510 let now = event.occurred_at.unwrap_or(event.received_at);
511 trim_hits(&mut self.state.throttle_hits, now, throttle.period);
512 if self.state.throttle_hits.len() >= throttle.max {
513 self.state.status.dropped_events = self.state.status.dropped_events.saturating_add(1);
514 self.append_status_event(
515 "stream_event_throttled",
516 json!({
517 "stream_id": self.config.stream_id,
518 "event_id": event.id.0,
519 "max": throttle.max,
520 "period_ms": throttle.period.as_millis(),
521 }),
522 )
523 .await?;
524 return Ok(false);
525 }
526 self.state.throttle_hits.push_back(now);
527 Ok(true)
528 }
529
530 async fn dead_letter_event(
531 &mut self,
532 event: &TriggerEvent,
533 reason: &str,
534 ) -> Result<(), DispatchError> {
535 self.state.status.dead_lettered_events =
536 self.state.status.dead_lettered_events.saturating_add(1);
537 self.append_topic_event(
538 TRIGGER_DLQ_TOPIC,
539 "stream_dead_lettered",
540 json!({
541 "stream_id": self.config.stream_id,
542 "event": event,
543 "reason": reason,
544 "pending_events": self.state.pending.len(),
545 "max_pending_events": self.config.backpressure.max_pending_events,
546 }),
547 )
548 .await?;
549 self.append_status_event(
550 "stream_event_dead_lettered",
551 json!({
552 "stream_id": self.config.stream_id,
553 "event_id": event.id.0,
554 "reason": reason,
555 }),
556 )
557 .await
558 }
559
560 async fn append_window_event(
561 &self,
562 envelope: &StreamWindowEnvelope,
563 ) -> Result<(), DispatchError> {
564 self.append_topic_event(
565 TRIGGER_STREAM_WINDOWS_TOPIC,
566 "stream_window_emitted",
567 serde_json::to_value(envelope)
568 .map_err(|error| DispatchError::Serde(error.to_string()))?,
569 )
570 .await?;
571 self.append_topic_event(
572 TRIGGERS_LIFECYCLE_TOPIC,
573 "StreamWindowEmitted",
574 json!({
575 "stream_id": envelope.stream_id,
576 "window_id": envelope.window_id,
577 "event_ids": envelope.event_ids,
578 "replay_of_event_id": envelope.replay_of_event_id,
579 }),
580 )
581 .await
582 }
583
584 async fn append_gate_record(&self, record: StreamGateRecord) -> Result<(), DispatchError> {
585 self.append_topic_event(
586 TRIGGER_STREAM_GATE_TOPIC,
587 "stream_gate_decision",
588 serde_json::to_value(record)
589 .map_err(|error| DispatchError::Serde(error.to_string()))?,
590 )
591 .await
592 }
593
594 async fn read_gate_record(
595 &self,
596 gate_id: &str,
597 cache_key: &str,
598 ) -> Result<Option<StreamGateRecord>, DispatchError> {
599 let topic =
600 Topic::new(TRIGGER_STREAM_GATE_TOPIC).expect("static stream gate topic is valid");
601 let records = self
602 .event_log
603 .read_range(&topic, None, usize::MAX)
604 .await
605 .map_err(DispatchError::from)?;
606 Ok(records
607 .into_iter()
608 .rev()
609 .filter(|(_, event)| event.kind == "stream_gate_decision")
610 .filter_map(|(_, event)| serde_json::from_value::<StreamGateRecord>(event.payload).ok())
611 .find(|record| record.gate_id == gate_id && record.cache_key == cache_key))
612 }
613
614 async fn append_status_event(
615 &self,
616 kind: &str,
617 mut payload: JsonValue,
618 ) -> Result<(), DispatchError> {
619 if let JsonValue::Object(ref mut object) = payload {
620 object.insert("status".to_string(), json!(self.snapshot()));
621 }
622 self.append_topic_event(TRIGGER_STREAM_STATUS_TOPIC, kind, payload)
623 .await
624 }
625
626 async fn append_topic_event(
627 &self,
628 topic: &str,
629 kind: &str,
630 payload: JsonValue,
631 ) -> Result<(), DispatchError> {
632 self.event_log
633 .append(
634 &Topic::new(topic).expect("static stream trigger topic is valid"),
635 LogEvent::new(kind, payload),
636 )
637 .await
638 .map(|_| ())
639 .map_err(DispatchError::from)
640 }
641}
642
643pub fn stream_window_summary(event: &TriggerEvent) -> Option<StreamWindowEnvelope> {
644 let batch = event.batch.as_ref()?;
645 let event_ids = batch
646 .iter()
647 .filter_map(|member| member.get("id"))
648 .filter_map(JsonValue::as_str)
649 .map(ToString::to_string)
650 .collect::<Vec<_>>();
651 Some(StreamWindowEnvelope {
652 stream_id: event
653 .headers
654 .get("harn_stream_id")
655 .cloned()
656 .unwrap_or_default(),
657 window_id: event
658 .headers
659 .get("harn_stream_window_id")
660 .cloned()
661 .unwrap_or_else(|| event.dedupe_key.clone()),
662 event_ids,
663 dedupe_keys: batch
664 .iter()
665 .filter_map(|member| member.get("dedupe_key"))
666 .filter_map(JsonValue::as_str)
667 .map(ToString::to_string)
668 .collect(),
669 first_occurred_at: None,
670 last_occurred_at: None,
671 replay_of_event_id: None,
672 })
673}
674
675pub fn stream_fixture_event(
676 provider: impl Into<String>,
677 kind: impl Into<String>,
678 stream: impl Into<String>,
679 offset: u64,
680 payload: JsonValue,
681) -> TriggerEvent {
682 let provider = provider.into();
683 let stream = stream.into();
684 let kind = kind.into();
685 let provider_id = crate::triggers::ProviderId::from(provider.clone());
686 let raw = json!({
687 "event": kind,
688 "stream": stream,
689 "offset": offset,
690 "value": payload,
691 });
692 TriggerEvent::new(
693 provider_id.clone(),
694 kind.clone(),
695 Some(clock::now_utc()),
696 format!("{stream}:{offset}"),
697 None,
698 BTreeMap::new(),
699 ProviderPayload::normalize(&provider_id, &kind, &BTreeMap::new(), raw).unwrap_or_else(
700 |_| {
701 ProviderPayload::Known(crate::triggers::event::KnownProviderPayload::Webhook(
702 crate::triggers::GenericWebhookPayload {
703 source: Some("stream_fixture".to_string()),
704 content_type: Some("application/json".to_string()),
705 raw: json!({}),
706 },
707 ))
708 },
709 ),
710 SignatureStatus::Unsigned,
711 )
712}
713
714fn trim_hits(hits: &mut VecDeque<OffsetDateTime>, now: OffsetDateTime, period: Duration) {
715 let period = time::Duration::try_from(period).unwrap_or_default();
716 while hits.front().is_some_and(|hit| now - *hit >= period) {
717 hits.pop_front();
718 }
719}
720
721fn default_window_step() -> usize {
722 1
723}
724
725mod duration_millis {
726 use std::time::Duration;
727
728 use serde::{Deserialize, Deserializer, Serializer};
729
730 pub fn serialize<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
731 where
732 S: Serializer,
733 {
734 serializer.serialize_u64(duration.as_millis() as u64)
735 }
736
737 pub fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
738 where
739 D: Deserializer<'de>,
740 {
741 Ok(Duration::from_millis(u64::deserialize(deserializer)?))
742 }
743}
744
745#[cfg(test)]
746mod tests {
747 use std::cell::Cell;
748 use std::rc::Rc;
749 use std::sync::Arc;
750
751 use serde_json::json;
752
753 use crate::event_log::{install_default_for_base_dir, EventLog};
754 use crate::stdlib::register_vm_stdlib;
755 use crate::triggers::dispatcher::RetryPolicy;
756 use crate::triggers::TriggerRetryConfig;
757 use crate::triggers::{
758 clear_trigger_registry, install_manifest_triggers, ProviderId, TriggerBindingSource,
759 TriggerBindingSpec, TriggerHandlerSpec,
760 };
761 use crate::trust_graph::AutonomyTier;
762 use crate::vm::Vm;
763
764 use super::*;
765
766 async fn read_topic(
767 log: Arc<AnyEventLog>,
768 topic: &str,
769 ) -> Vec<(u64, crate::event_log::LogEvent)> {
770 log.read_range(&Topic::new(topic).unwrap(), None, usize::MAX)
771 .await
772 .unwrap()
773 }
774
775 async fn stream_dispatcher_fixture(
776 source: &str,
777 ) -> (tempfile::TempDir, Arc<AnyEventLog>, Dispatcher) {
778 crate::reset_thread_local_state();
779 clear_trigger_registry();
780 let dir = tempfile::tempdir().expect("tempdir");
781 let log = install_default_for_base_dir(dir.path()).expect("event log");
782 let lib_path = dir.path().join("lib.harn");
783 std::fs::write(&lib_path, source).expect("write harn source");
784
785 let mut vm = Vm::new();
786 register_vm_stdlib(&mut vm);
787 vm.set_source_dir(dir.path());
788 let exports = vm
789 .load_module_exports(&lib_path)
790 .await
791 .expect("load stream handler");
792 let handler = exports.get("local_fn").expect("local_fn export").clone();
793 install_manifest_triggers(vec![TriggerBindingSpec {
794 id: "stream-window-handler".to_string(),
795 source: TriggerBindingSource::Manifest,
796 kind: "stream".to_string(),
797 provider: ProviderId::from("kafka"),
798 autonomy_tier: AutonomyTier::ActAuto,
799 handler: TriggerHandlerSpec::Local {
800 raw: "local_fn".to_string(),
801 closure: handler,
802 },
803 dispatch_priority: crate::triggers::WorkerQueuePriority::Normal,
804 when: None,
805 when_budget: None,
806 retry: TriggerRetryConfig::new(1, RetryPolicy::Linear { delay_ms: 1 }),
807 match_events: vec!["issues.opened.window".to_string()],
808 dedupe_key: Some("event.dedupe_key".to_string()),
809 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
810 filter: None,
811 daily_cost_usd: None,
812 hourly_cost_usd: None,
813 max_autonomous_decisions_per_hour: None,
814 max_autonomous_decisions_per_day: None,
815 on_budget_exhausted: crate::triggers::TriggerBudgetExhaustionStrategy::False,
816 max_concurrent: None,
817 flow_control: crate::triggers::TriggerFlowControlConfig::default(),
818 manifest_path: None,
819 package_name: Some("workspace".to_string()),
820 definition_fingerprint: "stream-window-handler:v1".to_string(),
821 }])
822 .await
823 .expect("install stream trigger");
824
825 (dir, log.clone(), Dispatcher::with_event_log(vm, log))
826 }
827
828 #[tokio::test(flavor = "current_thread")]
829 async fn chat_stream_windows_gate_and_dispatch_deterministically() {
830 let local = tokio::task::LocalSet::new();
831 local
832 .run_until(async {
833 let (_dir, log, dispatcher) = stream_dispatcher_fixture(
834 r#"
835import "std/triggers"
836
837pub fn local_fn(event: TriggerEvent) -> int {
838 return len(event.batch ?? [])
839}
840"#,
841 )
842 .await;
843
844 let mut runtime = StreamTriggerRuntime::new(
845 StreamTriggerConfig {
846 stream_id: "chat:triage".to_string(),
847 window: StreamWindowConfig::tumbling(2),
848 backpressure: StreamBackpressureConfig::default(),
849 flow: StreamFlowConfig::default(),
850 gate: Some(StreamGateConfig {
851 gate_id: "chat-llm-gate".to_string(),
852 cache_key: "chat:v1".to_string(),
853 replay_of_event_id: None,
854 }),
855 },
856 log.clone(),
857 dispatcher,
858 )
859 .unwrap();
860
861 let calls = Rc::new(Cell::new(0));
862 for offset in 0..2 {
863 let calls = calls.clone();
864 let outcomes = runtime
865 .push_event_with_gate(
866 stream_fixture_event(
867 "kafka",
868 "issues.opened",
869 "chat",
870 offset,
871 json!({"text": format!("message {offset}")}),
872 ),
873 move |_| {
874 calls.set(calls.get() + 1);
875 StreamGateOutcome::Pass {
876 reason: Some("fixture-positive".to_string()),
877 }
878 },
879 )
880 .await
881 .unwrap();
882 if offset == 1 {
883 assert_eq!(outcomes.len(), 1);
884 assert_eq!(outcomes[0].result, Some(json!(2)));
885 } else {
886 assert!(outcomes.is_empty());
887 }
888 }
889 assert_eq!(calls.get(), 1);
890
891 let gate_events = read_topic(log.clone(), TRIGGER_STREAM_GATE_TOPIC).await;
892 assert_eq!(
893 gate_events
894 .iter()
895 .filter(|(_, event)| event.kind == "stream_gate_decision")
896 .count(),
897 1
898 );
899
900 let first_window_id = gate_events[0].1.payload["window_id"]
901 .as_str()
902 .unwrap()
903 .to_string();
904 runtime.config.gate.as_mut().unwrap().replay_of_event_id =
905 Some(first_window_id.clone());
906
907 for offset in 0..2 {
908 let calls = calls.clone();
909 runtime
910 .push_event_with_gate(
911 stream_fixture_event(
912 "kafka",
913 "issues.opened",
914 "chat",
915 offset,
916 json!({"text": format!("message {offset}")}),
917 ),
918 move |_| {
919 calls.set(calls.get() + 1);
920 StreamGateOutcome::Block {
921 reason: Some("should-not-run".to_string()),
922 }
923 },
924 )
925 .await
926 .unwrap();
927 }
928 assert_eq!(calls.get(), 1);
929 let gate_events = read_topic(log.clone(), TRIGGER_STREAM_GATE_TOPIC).await;
930 assert!(gate_events.iter().any(|(_, event)| {
931 event.kind == "stream_gate_decision"
932 && event.payload["cached"] == json!(true)
933 && event.payload["window_id"] == json!(first_window_id)
934 }));
935 })
936 .await;
937 }
938
939 #[tokio::test(flavor = "current_thread")]
940 async fn issue_feed_backpressure_dead_letters_overflow() {
941 let local = tokio::task::LocalSet::new();
942 local
943 .run_until(async {
944 let dir = tempfile::tempdir().unwrap();
945 let log = install_default_for_base_dir(dir.path()).unwrap();
946 let mut vm = Vm::new();
947 register_vm_stdlib(&mut vm);
948 vm.set_source_dir(dir.path());
949 let dispatcher = Dispatcher::with_event_log(vm, log.clone());
950 let mut runtime = StreamTriggerRuntime::new(
951 StreamTriggerConfig {
952 stream_id: "issue-feed".to_string(),
953 window: StreamWindowConfig::fixed(3),
954 backpressure: StreamBackpressureConfig {
955 max_pending_events: 1,
956 overflow: StreamOverflowPolicy::DeadLetterNewest,
957 },
958 flow: StreamFlowConfig::default(),
959 gate: None,
960 },
961 log.clone(),
962 dispatcher,
963 )
964 .unwrap();
965
966 runtime
967 .push_event(stream_fixture_event(
968 "kafka",
969 "issues.opened",
970 "issues",
971 1,
972 json!({"number": 1}),
973 ))
974 .await
975 .unwrap();
976 runtime
977 .push_event(stream_fixture_event(
978 "kafka",
979 "issues.opened",
980 "issues",
981 2,
982 json!({"number": 2}),
983 ))
984 .await
985 .unwrap();
986
987 let snapshot = runtime.snapshot();
988 assert_eq!(snapshot.pending_events, 1);
989 assert_eq!(snapshot.dead_lettered_events, 1);
990
991 let dlq = read_topic(log.clone(), TRIGGER_DLQ_TOPIC).await;
992 assert!(dlq.iter().any(|(_, event)| {
993 event.kind == "stream_dead_lettered"
994 && event.payload["reason"] == json!("backpressure_queue_full")
995 }));
996 })
997 .await;
998 }
999
1000 #[tokio::test(flavor = "current_thread")]
1001 async fn sliding_windows_emit_overlapping_batches() {
1002 let local = tokio::task::LocalSet::new();
1003 local
1004 .run_until(async {
1005 let (_dir, log, dispatcher) = stream_dispatcher_fixture(
1006 r#"
1007import "std/triggers"
1008
1009pub fn local_fn(event: TriggerEvent) -> int {
1010 return len(event.batch ?? [])
1011}
1012"#,
1013 )
1014 .await;
1015 let mut runtime = StreamTriggerRuntime::new(
1016 StreamTriggerConfig {
1017 stream_id: "sliding".to_string(),
1018 window: StreamWindowConfig::sliding(2, 1),
1019 backpressure: StreamBackpressureConfig::default(),
1020 flow: StreamFlowConfig::default(),
1021 gate: None,
1022 },
1023 log.clone(),
1024 dispatcher,
1025 )
1026 .unwrap();
1027
1028 let mut dispatched = 0;
1029 for offset in 1..=3 {
1030 dispatched += runtime
1031 .push_event(stream_fixture_event(
1032 "kafka",
1033 "issues.opened",
1034 "sliding",
1035 offset,
1036 json!({"offset": offset}),
1037 ))
1038 .await
1039 .unwrap()
1040 .len();
1041 }
1042
1043 assert_eq!(dispatched, 2);
1044 assert_eq!(runtime.snapshot().emitted_windows, 2);
1045 let windows = read_topic(log, TRIGGER_STREAM_WINDOWS_TOPIC).await;
1046 assert_eq!(windows.len(), 2);
1047 })
1048 .await;
1049 }
1050}