Skip to main content

harn_vm/triggers/
streaming.rs

1use std::collections::{BTreeMap, VecDeque};
2use std::time::Duration;
3
4use serde::{Deserialize, Serialize};
5use serde_json::{json, Value as JsonValue};
6use time::OffsetDateTime;
7
8use crate::event_log::{AnyEventLog, EventLog, LogEvent, Topic};
9use crate::triggers::test_util::clock;
10use crate::triggers::{
11    Dispatcher, ProviderPayload, SignatureStatus, TraceId, TriggerEvent, TriggerEventId,
12};
13
14use super::{DispatchError, DispatchOutcome, TRIGGERS_LIFECYCLE_TOPIC, TRIGGER_DLQ_TOPIC};
15
16pub const TRIGGER_STREAM_WINDOWS_TOPIC: &str = "trigger.stream.windows";
17pub const TRIGGER_STREAM_STATUS_TOPIC: &str = "trigger.stream.status";
18pub const TRIGGER_STREAM_GATE_TOPIC: &str = "trigger.stream.gates";
19
20#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
21#[serde(rename_all = "snake_case")]
22pub enum StreamWindowMode {
23    Fixed,
24    Tumbling,
25    Sliding,
26}
27
28#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
29pub struct StreamWindowConfig {
30    pub mode: StreamWindowMode,
31    pub size: usize,
32    #[serde(default = "default_window_step")]
33    pub step: usize,
34}
35
36impl StreamWindowConfig {
37    pub fn fixed(size: usize) -> Self {
38        Self {
39            mode: StreamWindowMode::Fixed,
40            size,
41            step: size.max(1),
42        }
43    }
44
45    pub fn tumbling(size: usize) -> Self {
46        Self {
47            mode: StreamWindowMode::Tumbling,
48            size,
49            step: size.max(1),
50        }
51    }
52
53    pub fn sliding(size: usize, step: usize) -> Self {
54        Self {
55            mode: StreamWindowMode::Sliding,
56            size,
57            step: step.max(1),
58        }
59    }
60
61    fn validate(&self) -> Result<(), DispatchError> {
62        if self.size == 0 {
63            return Err(DispatchError::Local(
64                "stream window size must be positive".to_string(),
65            ));
66        }
67        if self.step == 0 {
68            return Err(DispatchError::Local(
69                "stream window step must be positive".to_string(),
70            ));
71        }
72        Ok(())
73    }
74
75    fn drain_after_emit(&self, pending: &mut VecDeque<TriggerEvent>) {
76        match self.mode {
77            StreamWindowMode::Fixed | StreamWindowMode::Tumbling => pending.clear(),
78            StreamWindowMode::Sliding => {
79                let remove = self.step.min(pending.len());
80                pending.drain(..remove);
81            }
82        }
83    }
84}
85
86#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
87#[serde(rename_all = "snake_case")]
88pub enum StreamOverflowPolicy {
89    DropNewest,
90    DropOldest,
91    DeadLetterNewest,
92}
93
94#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
95pub struct StreamBackpressureConfig {
96    pub max_pending_events: usize,
97    pub overflow: StreamOverflowPolicy,
98}
99
100impl Default for StreamBackpressureConfig {
101    fn default() -> Self {
102        Self {
103            max_pending_events: 1024,
104            overflow: StreamOverflowPolicy::DeadLetterNewest,
105        }
106    }
107}
108
109#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
110pub struct StreamThrottleConfig {
111    pub max: usize,
112    #[serde(with = "duration_millis")]
113    pub period: Duration,
114}
115
116#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
117pub struct StreamFlowConfig {
118    pub debounce_by_dedupe_key: bool,
119    pub throttle: Option<StreamThrottleConfig>,
120}
121
122#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
123pub struct StreamGateConfig {
124    pub gate_id: String,
125    pub cache_key: String,
126    pub replay_of_event_id: Option<String>,
127}
128
129#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
130pub struct StreamTriggerConfig {
131    pub stream_id: String,
132    pub window: StreamWindowConfig,
133    #[serde(default)]
134    pub backpressure: StreamBackpressureConfig,
135    #[serde(default)]
136    pub flow: StreamFlowConfig,
137    #[serde(default, skip_serializing_if = "Option::is_none")]
138    pub gate: Option<StreamGateConfig>,
139}
140
141impl StreamTriggerConfig {
142    pub fn validate(&self) -> Result<(), DispatchError> {
143        if self.stream_id.trim().is_empty() {
144            return Err(DispatchError::Local(
145                "stream id must be a non-empty string".to_string(),
146            ));
147        }
148        self.window.validate()?;
149        if self.backpressure.max_pending_events == 0 {
150            return Err(DispatchError::Local(
151                "stream max_pending_events must be positive".to_string(),
152            ));
153        }
154        if let Some(throttle) = self.flow.throttle.as_ref() {
155            if throttle.max == 0 || throttle.period.is_zero() {
156                return Err(DispatchError::Local(
157                    "stream throttle requires positive max and period".to_string(),
158                ));
159            }
160        }
161        Ok(())
162    }
163}
164
165#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
166pub struct StreamWindowEnvelope {
167    pub stream_id: String,
168    pub window_id: String,
169    pub event_ids: Vec<String>,
170    pub dedupe_keys: Vec<String>,
171    pub first_occurred_at: Option<OffsetDateTime>,
172    pub last_occurred_at: Option<OffsetDateTime>,
173    pub replay_of_event_id: Option<String>,
174}
175
176#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
177pub struct StreamStatusSnapshot {
178    pub stream_id: String,
179    pub pending_events: usize,
180    pub admitted_events: u64,
181    pub dropped_events: u64,
182    pub dead_lettered_events: u64,
183    pub emitted_windows: u64,
184    pub gate_passed: u64,
185    pub gate_blocked: u64,
186}
187
188#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
189pub struct StreamGateRecord {
190    pub gate_id: String,
191    pub cache_key: String,
192    pub window_id: String,
193    pub result: bool,
194    pub cached: bool,
195    pub reason: Option<String>,
196    pub replay_of_event_id: Option<String>,
197}
198
199#[derive(Clone, Debug, PartialEq, Eq)]
200pub enum StreamGateOutcome {
201    Pass { reason: Option<String> },
202    Block { reason: Option<String> },
203}
204
205impl StreamGateOutcome {
206    fn result(&self) -> bool {
207        matches!(self, Self::Pass { .. })
208    }
209
210    fn reason(&self) -> Option<String> {
211        match self {
212            Self::Pass { reason } | Self::Block { reason } => reason.clone(),
213        }
214    }
215}
216
217#[derive(Debug, Default)]
218struct StreamRuntimeState {
219    pending: VecDeque<TriggerEvent>,
220    throttle_hits: VecDeque<OffsetDateTime>,
221    status: StreamStatusSnapshot,
222}
223
224pub struct StreamTriggerRuntime {
225    config: StreamTriggerConfig,
226    event_log: std::sync::Arc<AnyEventLog>,
227    dispatcher: Dispatcher,
228    state: StreamRuntimeState,
229}
230
231impl StreamTriggerRuntime {
232    pub fn new(
233        config: StreamTriggerConfig,
234        event_log: std::sync::Arc<AnyEventLog>,
235        dispatcher: Dispatcher,
236    ) -> Result<Self, DispatchError> {
237        config.validate()?;
238        let status = StreamStatusSnapshot {
239            stream_id: config.stream_id.clone(),
240            ..Default::default()
241        };
242        Ok(Self {
243            config,
244            event_log,
245            dispatcher,
246            state: StreamRuntimeState {
247                status,
248                ..Default::default()
249            },
250        })
251    }
252
253    pub fn snapshot(&self) -> StreamStatusSnapshot {
254        let mut snapshot = self.state.status.clone();
255        snapshot.pending_events = self.state.pending.len();
256        snapshot
257    }
258
259    pub async fn push_event(
260        &mut self,
261        event: TriggerEvent,
262    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
263        self.push_event_with_gate(event, |_| StreamGateOutcome::Pass { reason: None })
264            .await
265    }
266
267    pub async fn push_event_with_gate(
268        &mut self,
269        event: TriggerEvent,
270        gate: impl Fn(&StreamWindowEnvelope) -> StreamGateOutcome,
271    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
272        self.append_status_event(
273            "stream_event_received",
274            json!({
275                "stream_id": self.config.stream_id,
276                "event_id": event.id.0,
277                "dedupe_key": event.dedupe_key,
278                "provider": event.provider.as_str(),
279                "kind": event.kind,
280            }),
281        )
282        .await?;
283
284        if !self.apply_throttle(&event).await? {
285            return Ok(Vec::new());
286        }
287
288        self.admit_event(event).await?;
289        let windows = self.emit_ready_windows().await?;
290        let mut outcomes = Vec::new();
291        for (envelope, window_event) in windows {
292            if !self.evaluate_gate(&envelope, &gate).await? {
293                continue;
294            }
295            outcomes.extend(self.dispatcher.dispatch_event(window_event).await?);
296        }
297        Ok(outcomes)
298    }
299
300    async fn admit_event(&mut self, event: TriggerEvent) -> Result<(), DispatchError> {
301        if self.config.flow.debounce_by_dedupe_key {
302            let before = self.state.pending.len();
303            self.state
304                .pending
305                .retain(|pending| pending.dedupe_key != event.dedupe_key);
306            if self.state.pending.len() != before {
307                self.append_status_event(
308                    "stream_event_debounced",
309                    json!({
310                        "stream_id": self.config.stream_id,
311                        "dedupe_key": event.dedupe_key,
312                    }),
313                )
314                .await?;
315            }
316        }
317
318        if self.state.pending.len() >= self.config.backpressure.max_pending_events {
319            match self.config.backpressure.overflow {
320                StreamOverflowPolicy::DropNewest => {
321                    self.state.status.dropped_events =
322                        self.state.status.dropped_events.saturating_add(1);
323                    self.append_status_event(
324                        "stream_event_dropped",
325                        json!({
326                            "stream_id": self.config.stream_id,
327                            "event_id": event.id.0,
328                            "reason": "backpressure_drop_newest",
329                            "max_pending_events": self.config.backpressure.max_pending_events,
330                        }),
331                    )
332                    .await?;
333                    return Ok(());
334                }
335                StreamOverflowPolicy::DropOldest => {
336                    if let Some(dropped) = self.state.pending.pop_front() {
337                        self.state.status.dropped_events =
338                            self.state.status.dropped_events.saturating_add(1);
339                        self.append_status_event(
340                            "stream_event_dropped",
341                            json!({
342                                "stream_id": self.config.stream_id,
343                                "event_id": dropped.id.0,
344                                "reason": "backpressure_drop_oldest",
345                                "max_pending_events": self.config.backpressure.max_pending_events,
346                            }),
347                        )
348                        .await?;
349                    }
350                }
351                StreamOverflowPolicy::DeadLetterNewest => {
352                    self.dead_letter_event(&event, "backpressure_queue_full")
353                        .await?;
354                    return Ok(());
355                }
356            }
357        }
358
359        self.state.status.admitted_events = self.state.status.admitted_events.saturating_add(1);
360        self.state.pending.push_back(event);
361        Ok(())
362    }
363
364    async fn emit_ready_windows(
365        &mut self,
366    ) -> Result<Vec<(StreamWindowEnvelope, TriggerEvent)>, DispatchError> {
367        let mut windows = Vec::new();
368        while self.state.pending.len() >= self.config.window.size {
369            let members = self
370                .state
371                .pending
372                .iter()
373                .take(self.config.window.size)
374                .cloned()
375                .collect::<Vec<_>>();
376            let envelope = self.window_envelope(&members);
377            let event = self.window_event(&members, &envelope)?;
378            self.append_window_event(&envelope).await?;
379            self.state.status.emitted_windows = self.state.status.emitted_windows.saturating_add(1);
380            windows.push((envelope, event));
381            self.config.window.drain_after_emit(&mut self.state.pending);
382        }
383        Ok(windows)
384    }
385
386    fn window_envelope(&self, members: &[TriggerEvent]) -> StreamWindowEnvelope {
387        let event_ids = members
388            .iter()
389            .map(|event| event.id.0.clone())
390            .collect::<Vec<_>>();
391        let dedupe_keys = members
392            .iter()
393            .map(|event| event.dedupe_key.clone())
394            .collect::<Vec<_>>();
395        let first_occurred_at = members
396            .iter()
397            .filter_map(|event| event.occurred_at)
398            .min()
399            .or_else(|| members.first().map(|event| event.received_at));
400        let last_occurred_at = members
401            .iter()
402            .filter_map(|event| event.occurred_at)
403            .max()
404            .or_else(|| members.last().map(|event| event.received_at));
405        let source = dedupe_keys.join("+");
406        StreamWindowEnvelope {
407            stream_id: self.config.stream_id.clone(),
408            window_id: format!("stream_window:{}:{source}", self.config.stream_id),
409            event_ids,
410            dedupe_keys,
411            first_occurred_at,
412            last_occurred_at,
413            replay_of_event_id: self
414                .config
415                .gate
416                .as_ref()
417                .and_then(|gate| gate.replay_of_event_id.clone()),
418        }
419    }
420
421    fn window_event(
422        &self,
423        members: &[TriggerEvent],
424        envelope: &StreamWindowEnvelope,
425    ) -> Result<TriggerEvent, DispatchError> {
426        let first = members
427            .first()
428            .ok_or_else(|| DispatchError::Local("stream window cannot be empty".to_string()))?;
429        let mut headers = first.headers.clone();
430        headers.insert("harn_stream_id".to_string(), envelope.stream_id.clone());
431        headers.insert(
432            "harn_stream_window_id".to_string(),
433            envelope.window_id.clone(),
434        );
435        headers.insert(
436            "harn_stream_source_event_ids".to_string(),
437            envelope.event_ids.join(","),
438        );
439        let batch = members
440            .iter()
441            .map(|event| serde_json::to_value(event).map_err(|error| error.to_string()))
442            .collect::<Result<Vec<_>, _>>()
443            .map_err(DispatchError::Serde)?;
444        let mut window_event = first.clone();
445        window_event.id = TriggerEventId::new();
446        window_event.kind = format!("{}.window", first.kind);
447        window_event.dedupe_key = envelope.window_id.clone();
448        window_event.trace_id = TraceId::new();
449        window_event.headers = headers;
450        window_event.batch = Some(batch);
451        Ok(window_event)
452    }
453
454    async fn evaluate_gate(
455        &mut self,
456        envelope: &StreamWindowEnvelope,
457        gate: &impl Fn(&StreamWindowEnvelope) -> StreamGateOutcome,
458    ) -> Result<bool, DispatchError> {
459        let Some(config) = self.config.gate.clone() else {
460            return Ok(true);
461        };
462        let cache_key = format!("{}:{}", config.cache_key, envelope.dedupe_keys.join("+"));
463        if let Some(record) = self.read_gate_record(&config.gate_id, &cache_key).await? {
464            self.append_gate_record(StreamGateRecord {
465                cached: true,
466                ..record.clone()
467            })
468            .await?;
469            if record.result {
470                self.state.status.gate_passed = self.state.status.gate_passed.saturating_add(1);
471            } else {
472                self.state.status.gate_blocked = self.state.status.gate_blocked.saturating_add(1);
473            }
474            return Ok(record.result);
475        }
476
477        let outcome = gate(envelope);
478        let record = StreamGateRecord {
479            gate_id: config.gate_id,
480            cache_key,
481            window_id: envelope.window_id.clone(),
482            result: outcome.result(),
483            cached: false,
484            reason: outcome.reason(),
485            replay_of_event_id: config.replay_of_event_id,
486        };
487        self.append_gate_record(record.clone()).await?;
488        if record.result {
489            self.state.status.gate_passed = self.state.status.gate_passed.saturating_add(1);
490        } else {
491            self.state.status.gate_blocked = self.state.status.gate_blocked.saturating_add(1);
492            self.append_status_event(
493                "stream_window_gate_blocked",
494                json!({
495                    "stream_id": self.config.stream_id,
496                    "window_id": envelope.window_id,
497                    "gate_id": record.gate_id,
498                    "reason": record.reason,
499                }),
500            )
501            .await?;
502        }
503        Ok(record.result)
504    }
505
506    async fn apply_throttle(&mut self, event: &TriggerEvent) -> Result<bool, DispatchError> {
507        let Some(throttle) = self.config.flow.throttle.clone() else {
508            return Ok(true);
509        };
510        let now = event.occurred_at.unwrap_or(event.received_at);
511        trim_hits(&mut self.state.throttle_hits, now, throttle.period);
512        if self.state.throttle_hits.len() >= throttle.max {
513            self.state.status.dropped_events = self.state.status.dropped_events.saturating_add(1);
514            self.append_status_event(
515                "stream_event_throttled",
516                json!({
517                    "stream_id": self.config.stream_id,
518                    "event_id": event.id.0,
519                    "max": throttle.max,
520                    "period_ms": throttle.period.as_millis(),
521                }),
522            )
523            .await?;
524            return Ok(false);
525        }
526        self.state.throttle_hits.push_back(now);
527        Ok(true)
528    }
529
530    async fn dead_letter_event(
531        &mut self,
532        event: &TriggerEvent,
533        reason: &str,
534    ) -> Result<(), DispatchError> {
535        self.state.status.dead_lettered_events =
536            self.state.status.dead_lettered_events.saturating_add(1);
537        self.append_topic_event(
538            TRIGGER_DLQ_TOPIC,
539            "stream_dead_lettered",
540            json!({
541                "stream_id": self.config.stream_id,
542                "event": event,
543                "reason": reason,
544                "pending_events": self.state.pending.len(),
545                "max_pending_events": self.config.backpressure.max_pending_events,
546            }),
547        )
548        .await?;
549        self.append_status_event(
550            "stream_event_dead_lettered",
551            json!({
552                "stream_id": self.config.stream_id,
553                "event_id": event.id.0,
554                "reason": reason,
555            }),
556        )
557        .await
558    }
559
560    async fn append_window_event(
561        &self,
562        envelope: &StreamWindowEnvelope,
563    ) -> Result<(), DispatchError> {
564        self.append_topic_event(
565            TRIGGER_STREAM_WINDOWS_TOPIC,
566            "stream_window_emitted",
567            serde_json::to_value(envelope)
568                .map_err(|error| DispatchError::Serde(error.to_string()))?,
569        )
570        .await?;
571        self.append_topic_event(
572            TRIGGERS_LIFECYCLE_TOPIC,
573            "StreamWindowEmitted",
574            json!({
575                "stream_id": envelope.stream_id,
576                "window_id": envelope.window_id,
577                "event_ids": envelope.event_ids,
578                "replay_of_event_id": envelope.replay_of_event_id,
579            }),
580        )
581        .await
582    }
583
584    async fn append_gate_record(&self, record: StreamGateRecord) -> Result<(), DispatchError> {
585        self.append_topic_event(
586            TRIGGER_STREAM_GATE_TOPIC,
587            "stream_gate_decision",
588            serde_json::to_value(record)
589                .map_err(|error| DispatchError::Serde(error.to_string()))?,
590        )
591        .await
592    }
593
594    async fn read_gate_record(
595        &self,
596        gate_id: &str,
597        cache_key: &str,
598    ) -> Result<Option<StreamGateRecord>, DispatchError> {
599        let topic =
600            Topic::new(TRIGGER_STREAM_GATE_TOPIC).expect("static stream gate topic is valid");
601        let records = self
602            .event_log
603            .read_range(&topic, None, usize::MAX)
604            .await
605            .map_err(DispatchError::from)?;
606        Ok(records
607            .into_iter()
608            .rev()
609            .filter(|(_, event)| event.kind == "stream_gate_decision")
610            .filter_map(|(_, event)| serde_json::from_value::<StreamGateRecord>(event.payload).ok())
611            .find(|record| record.gate_id == gate_id && record.cache_key == cache_key))
612    }
613
614    async fn append_status_event(
615        &self,
616        kind: &str,
617        mut payload: JsonValue,
618    ) -> Result<(), DispatchError> {
619        if let JsonValue::Object(ref mut object) = payload {
620            object.insert("status".to_string(), json!(self.snapshot()));
621        }
622        self.append_topic_event(TRIGGER_STREAM_STATUS_TOPIC, kind, payload)
623            .await
624    }
625
626    async fn append_topic_event(
627        &self,
628        topic: &str,
629        kind: &str,
630        payload: JsonValue,
631    ) -> Result<(), DispatchError> {
632        self.event_log
633            .append(
634                &Topic::new(topic).expect("static stream trigger topic is valid"),
635                LogEvent::new(kind, payload),
636            )
637            .await
638            .map(|_| ())
639            .map_err(DispatchError::from)
640    }
641}
642
643pub fn stream_window_summary(event: &TriggerEvent) -> Option<StreamWindowEnvelope> {
644    let batch = event.batch.as_ref()?;
645    let event_ids = batch
646        .iter()
647        .filter_map(|member| member.get("id"))
648        .filter_map(JsonValue::as_str)
649        .map(ToString::to_string)
650        .collect::<Vec<_>>();
651    Some(StreamWindowEnvelope {
652        stream_id: event
653            .headers
654            .get("harn_stream_id")
655            .cloned()
656            .unwrap_or_default(),
657        window_id: event
658            .headers
659            .get("harn_stream_window_id")
660            .cloned()
661            .unwrap_or_else(|| event.dedupe_key.clone()),
662        event_ids,
663        dedupe_keys: batch
664            .iter()
665            .filter_map(|member| member.get("dedupe_key"))
666            .filter_map(JsonValue::as_str)
667            .map(ToString::to_string)
668            .collect(),
669        first_occurred_at: None,
670        last_occurred_at: None,
671        replay_of_event_id: None,
672    })
673}
674
675pub fn stream_fixture_event(
676    provider: impl Into<String>,
677    kind: impl Into<String>,
678    stream: impl Into<String>,
679    offset: u64,
680    payload: JsonValue,
681) -> TriggerEvent {
682    let provider = provider.into();
683    let stream = stream.into();
684    let kind = kind.into();
685    let provider_id = crate::triggers::ProviderId::from(provider.clone());
686    let raw = json!({
687        "event": kind,
688        "stream": stream,
689        "offset": offset,
690        "value": payload,
691    });
692    TriggerEvent::new(
693        provider_id.clone(),
694        kind.clone(),
695        Some(clock::now_utc()),
696        format!("{stream}:{offset}"),
697        None,
698        BTreeMap::new(),
699        ProviderPayload::normalize(&provider_id, &kind, &BTreeMap::new(), raw).unwrap_or_else(
700            |_| {
701                ProviderPayload::Known(crate::triggers::event::KnownProviderPayload::Webhook(
702                    crate::triggers::GenericWebhookPayload {
703                        source: Some("stream_fixture".to_string()),
704                        content_type: Some("application/json".to_string()),
705                        raw: json!({}),
706                    },
707                ))
708            },
709        ),
710        SignatureStatus::Unsigned,
711    )
712}
713
714fn trim_hits(hits: &mut VecDeque<OffsetDateTime>, now: OffsetDateTime, period: Duration) {
715    let period = time::Duration::try_from(period).unwrap_or_default();
716    while hits.front().is_some_and(|hit| now - *hit >= period) {
717        hits.pop_front();
718    }
719}
720
721fn default_window_step() -> usize {
722    1
723}
724
725mod duration_millis {
726    use std::time::Duration;
727
728    use serde::{Deserialize, Deserializer, Serializer};
729
730    pub fn serialize<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
731    where
732        S: Serializer,
733    {
734        serializer.serialize_u64(duration.as_millis() as u64)
735    }
736
737    pub fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
738    where
739        D: Deserializer<'de>,
740    {
741        Ok(Duration::from_millis(u64::deserialize(deserializer)?))
742    }
743}
744
745#[cfg(test)]
746mod tests {
747    use std::cell::Cell;
748    use std::rc::Rc;
749    use std::sync::Arc;
750
751    use serde_json::json;
752
753    use crate::event_log::{install_default_for_base_dir, EventLog};
754    use crate::stdlib::register_vm_stdlib;
755    use crate::triggers::dispatcher::RetryPolicy;
756    use crate::triggers::TriggerRetryConfig;
757    use crate::triggers::{
758        clear_trigger_registry, install_manifest_triggers, ProviderId, TriggerBindingSource,
759        TriggerBindingSpec, TriggerHandlerSpec,
760    };
761    use crate::trust_graph::AutonomyTier;
762    use crate::vm::Vm;
763
764    use super::*;
765
766    async fn read_topic(
767        log: Arc<AnyEventLog>,
768        topic: &str,
769    ) -> Vec<(u64, crate::event_log::LogEvent)> {
770        log.read_range(&Topic::new(topic).unwrap(), None, usize::MAX)
771            .await
772            .unwrap()
773    }
774
775    async fn stream_dispatcher_fixture(
776        source: &str,
777    ) -> (tempfile::TempDir, Arc<AnyEventLog>, Dispatcher) {
778        crate::reset_thread_local_state();
779        clear_trigger_registry();
780        let dir = tempfile::tempdir().expect("tempdir");
781        let log = install_default_for_base_dir(dir.path()).expect("event log");
782        let lib_path = dir.path().join("lib.harn");
783        std::fs::write(&lib_path, source).expect("write harn source");
784
785        let mut vm = Vm::new();
786        register_vm_stdlib(&mut vm);
787        vm.set_source_dir(dir.path());
788        let exports = vm
789            .load_module_exports(&lib_path)
790            .await
791            .expect("load stream handler");
792        let handler = exports.get("local_fn").expect("local_fn export").clone();
793        install_manifest_triggers(vec![TriggerBindingSpec {
794            id: "stream-window-handler".to_string(),
795            source: TriggerBindingSource::Manifest,
796            kind: "stream".to_string(),
797            provider: ProviderId::from("kafka"),
798            autonomy_tier: AutonomyTier::ActAuto,
799            handler: TriggerHandlerSpec::Local {
800                raw: "local_fn".to_string(),
801                closure: handler,
802            },
803            dispatch_priority: crate::triggers::WorkerQueuePriority::Normal,
804            when: None,
805            when_budget: None,
806            retry: TriggerRetryConfig::new(1, RetryPolicy::Linear { delay_ms: 1 }),
807            match_events: vec!["issues.opened.window".to_string()],
808            dedupe_key: Some("event.dedupe_key".to_string()),
809            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
810            filter: None,
811            daily_cost_usd: None,
812            hourly_cost_usd: None,
813            max_autonomous_decisions_per_hour: None,
814            max_autonomous_decisions_per_day: None,
815            on_budget_exhausted: crate::triggers::TriggerBudgetExhaustionStrategy::False,
816            max_concurrent: None,
817            flow_control: crate::triggers::TriggerFlowControlConfig::default(),
818            manifest_path: None,
819            package_name: Some("workspace".to_string()),
820            definition_fingerprint: "stream-window-handler:v1".to_string(),
821        }])
822        .await
823        .expect("install stream trigger");
824
825        (dir, log.clone(), Dispatcher::with_event_log(vm, log))
826    }
827
828    #[tokio::test(flavor = "current_thread")]
829    async fn chat_stream_windows_gate_and_dispatch_deterministically() {
830        let local = tokio::task::LocalSet::new();
831        local
832            .run_until(async {
833                let (_dir, log, dispatcher) = stream_dispatcher_fixture(
834                    r#"
835import "std/triggers"
836
837pub fn local_fn(event: TriggerEvent) -> int {
838  return len(event.batch ?? [])
839}
840"#,
841                )
842                .await;
843
844                let mut runtime = StreamTriggerRuntime::new(
845                    StreamTriggerConfig {
846                        stream_id: "chat:triage".to_string(),
847                        window: StreamWindowConfig::tumbling(2),
848                        backpressure: StreamBackpressureConfig::default(),
849                        flow: StreamFlowConfig::default(),
850                        gate: Some(StreamGateConfig {
851                            gate_id: "chat-llm-gate".to_string(),
852                            cache_key: "chat:v1".to_string(),
853                            replay_of_event_id: None,
854                        }),
855                    },
856                    log.clone(),
857                    dispatcher,
858                )
859                .unwrap();
860
861                let calls = Rc::new(Cell::new(0));
862                for offset in 0..2 {
863                    let calls = calls.clone();
864                    let outcomes = runtime
865                        .push_event_with_gate(
866                            stream_fixture_event(
867                                "kafka",
868                                "issues.opened",
869                                "chat",
870                                offset,
871                                json!({"text": format!("message {offset}")}),
872                            ),
873                            move |_| {
874                                calls.set(calls.get() + 1);
875                                StreamGateOutcome::Pass {
876                                    reason: Some("fixture-positive".to_string()),
877                                }
878                            },
879                        )
880                        .await
881                        .unwrap();
882                    if offset == 1 {
883                        assert_eq!(outcomes.len(), 1);
884                        assert_eq!(outcomes[0].result, Some(json!(2)));
885                    } else {
886                        assert!(outcomes.is_empty());
887                    }
888                }
889                assert_eq!(calls.get(), 1);
890
891                let gate_events = read_topic(log.clone(), TRIGGER_STREAM_GATE_TOPIC).await;
892                assert_eq!(
893                    gate_events
894                        .iter()
895                        .filter(|(_, event)| event.kind == "stream_gate_decision")
896                        .count(),
897                    1
898                );
899
900                let first_window_id = gate_events[0].1.payload["window_id"]
901                    .as_str()
902                    .unwrap()
903                    .to_string();
904                runtime.config.gate.as_mut().unwrap().replay_of_event_id =
905                    Some(first_window_id.clone());
906
907                for offset in 0..2 {
908                    let calls = calls.clone();
909                    runtime
910                        .push_event_with_gate(
911                            stream_fixture_event(
912                                "kafka",
913                                "issues.opened",
914                                "chat",
915                                offset,
916                                json!({"text": format!("message {offset}")}),
917                            ),
918                            move |_| {
919                                calls.set(calls.get() + 1);
920                                StreamGateOutcome::Block {
921                                    reason: Some("should-not-run".to_string()),
922                                }
923                            },
924                        )
925                        .await
926                        .unwrap();
927                }
928                assert_eq!(calls.get(), 1);
929                let gate_events = read_topic(log.clone(), TRIGGER_STREAM_GATE_TOPIC).await;
930                assert!(gate_events.iter().any(|(_, event)| {
931                    event.kind == "stream_gate_decision"
932                        && event.payload["cached"] == json!(true)
933                        && event.payload["window_id"] == json!(first_window_id)
934                }));
935            })
936            .await;
937    }
938
939    #[tokio::test(flavor = "current_thread")]
940    async fn issue_feed_backpressure_dead_letters_overflow() {
941        let local = tokio::task::LocalSet::new();
942        local
943            .run_until(async {
944                let dir = tempfile::tempdir().unwrap();
945                let log = install_default_for_base_dir(dir.path()).unwrap();
946                let mut vm = Vm::new();
947                register_vm_stdlib(&mut vm);
948                vm.set_source_dir(dir.path());
949                let dispatcher = Dispatcher::with_event_log(vm, log.clone());
950                let mut runtime = StreamTriggerRuntime::new(
951                    StreamTriggerConfig {
952                        stream_id: "issue-feed".to_string(),
953                        window: StreamWindowConfig::fixed(3),
954                        backpressure: StreamBackpressureConfig {
955                            max_pending_events: 1,
956                            overflow: StreamOverflowPolicy::DeadLetterNewest,
957                        },
958                        flow: StreamFlowConfig::default(),
959                        gate: None,
960                    },
961                    log.clone(),
962                    dispatcher,
963                )
964                .unwrap();
965
966                runtime
967                    .push_event(stream_fixture_event(
968                        "kafka",
969                        "issues.opened",
970                        "issues",
971                        1,
972                        json!({"number": 1}),
973                    ))
974                    .await
975                    .unwrap();
976                runtime
977                    .push_event(stream_fixture_event(
978                        "kafka",
979                        "issues.opened",
980                        "issues",
981                        2,
982                        json!({"number": 2}),
983                    ))
984                    .await
985                    .unwrap();
986
987                let snapshot = runtime.snapshot();
988                assert_eq!(snapshot.pending_events, 1);
989                assert_eq!(snapshot.dead_lettered_events, 1);
990
991                let dlq = read_topic(log.clone(), TRIGGER_DLQ_TOPIC).await;
992                assert!(dlq.iter().any(|(_, event)| {
993                    event.kind == "stream_dead_lettered"
994                        && event.payload["reason"] == json!("backpressure_queue_full")
995                }));
996            })
997            .await;
998    }
999
1000    #[tokio::test(flavor = "current_thread")]
1001    async fn sliding_windows_emit_overlapping_batches() {
1002        let local = tokio::task::LocalSet::new();
1003        local
1004            .run_until(async {
1005                let (_dir, log, dispatcher) = stream_dispatcher_fixture(
1006                    r#"
1007import "std/triggers"
1008
1009pub fn local_fn(event: TriggerEvent) -> int {
1010  return len(event.batch ?? [])
1011}
1012"#,
1013                )
1014                .await;
1015                let mut runtime = StreamTriggerRuntime::new(
1016                    StreamTriggerConfig {
1017                        stream_id: "sliding".to_string(),
1018                        window: StreamWindowConfig::sliding(2, 1),
1019                        backpressure: StreamBackpressureConfig::default(),
1020                        flow: StreamFlowConfig::default(),
1021                        gate: None,
1022                    },
1023                    log.clone(),
1024                    dispatcher,
1025                )
1026                .unwrap();
1027
1028                let mut dispatched = 0;
1029                for offset in 1..=3 {
1030                    dispatched += runtime
1031                        .push_event(stream_fixture_event(
1032                            "kafka",
1033                            "issues.opened",
1034                            "sliding",
1035                            offset,
1036                            json!({"offset": offset}),
1037                        ))
1038                        .await
1039                        .unwrap()
1040                        .len();
1041                }
1042
1043                assert_eq!(dispatched, 2);
1044                assert_eq!(runtime.snapshot().emitted_windows, 2);
1045                let windows = read_topic(log, TRIGGER_STREAM_WINDOWS_TOPIC).await;
1046                assert_eq!(windows.len(), 2);
1047            })
1048            .await;
1049    }
1050}