Skip to main content

akribes_sdk/sub/
events.rs

1use std::sync::Arc;
2
3use futures::StreamExt;
4use tokio::sync::{mpsc, oneshot};
5use tokio::task::JoinHandle;
6
7use crate::client::Inner;
8use crate::error::{AkribesError, Result};
9use crate::models::*;
10
11/// Handle to a background SSE stream. Dropping it cancels all associated
12/// tasks (listener and any filter/translator tasks spawned alongside it).
13pub struct EventSubscription {
14    handles: Vec<JoinHandle<()>>,
15}
16
17impl EventSubscription {
18    /// Explicitly cancel the subscription.
19    pub fn cancel(self) {
20        for h in &self.handles {
21            h.abort();
22        }
23    }
24
25    pub(crate) fn from_handle(handle: JoinHandle<()>) -> Self {
26        Self {
27            handles: vec![handle],
28        }
29    }
30
31    pub(crate) fn from_handles(handles: Vec<JoinHandle<()>>) -> Self {
32        Self { handles }
33    }
34}
35
36impl Drop for EventSubscription {
37    fn drop(&mut self) {
38        for h in &self.handles {
39            h.abort();
40        }
41    }
42}
43
44/// Sub-client for SSE event streams. Obtained via [`AkribesClient::events()`].
45#[derive(Clone, Debug)]
46pub struct EventsClient {
47    pub(crate) inner: Arc<Inner>,
48    pub(crate) project_id: i64,
49}
50
51impl EventsClient {
52    pub(crate) fn new(inner: Arc<Inner>, project_id: i64) -> Self {
53        Self { inner, project_id }
54    }
55
56    /// Open an SSE event stream and return a receiver + subscription handle.
57    ///
58    /// Events are sent to the returned `mpsc::UnboundedReceiver`. Dropping the
59    /// `EventSubscription` cancels the background task automatically.
60    ///
61    /// **Note:** The channel is unbounded — a slow consumer on a busy execution
62    /// stream can cause unbounded memory growth. Callers should process events
63    /// promptly, use [`tokio::sync::mpsc::Receiver::try_recv`] to drain, or
64    /// prefer [`event_stream_bounded`](Self::event_stream_bounded) (#1117)
65    /// when consumer back-pressure is required.
66    pub async fn event_stream(
67        &self,
68        script_name: Option<&str>,
69    ) -> Result<(mpsc::UnboundedReceiver<HubEvent>, EventSubscription)> {
70        let base_url = self.inner.base_url.clone();
71        let project_id = self.project_id;
72        let script_name = script_name.map(|s| s.to_string());
73        let (tx, rx) = mpsc::unbounded_channel();
74        let http = self.inner.http.clone();
75        let token = self.inner.token.clone();
76
77        let handle = tokio::spawn(async move {
78            let _ = stream_sse_with_retry(http, token, base_url, project_id, script_name, tx, None)
79                .await;
80        });
81
82        Ok((
83            rx,
84            EventSubscription {
85                handles: vec![handle],
86            },
87        ))
88    }
89
90    /// Open an SSE event stream on a **bounded** channel (#1117).
91    ///
92    /// `buffer` is the channel's max in-flight event count. When the
93    /// consumer can't keep up, the background SSE listener applies
94    /// back-pressure: it parks until the consumer drains a slot.
95    /// This is the safer default for long-lived subscriptions on busy
96    /// executions — the unbounded variant can grow unboundedly when
97    /// the consumer stalls. The trade-off is that prolonged stalls can
98    /// stall the SSE listener too, which counts against the
99    /// server-side keepalive window; pick `buffer` generously
100    /// (e.g. 1024) when in doubt.
101    ///
102    /// Returns a standard bounded `mpsc::Receiver`; otherwise identical
103    /// to [`event_stream`](Self::event_stream).
104    pub async fn event_stream_bounded(
105        &self,
106        script_name: Option<&str>,
107        buffer: usize,
108    ) -> Result<(mpsc::Receiver<HubEvent>, EventSubscription)> {
109        let base_url = self.inner.base_url.clone();
110        let project_id = self.project_id;
111        let script_name = script_name.map(|s| s.to_string());
112        let (tx_bounded, rx_bounded) = mpsc::channel::<HubEvent>(buffer.max(1));
113        // The internal SSE pipeline uses UnboundedSender; we forward
114        // events from it to the bounded sender, applying back-pressure
115        // there. A small inner buffer keeps the SSE parser unblocked
116        // during transient sender-side awaits.
117        let (tx_inner, mut rx_inner) = mpsc::unbounded_channel::<HubEvent>();
118        let http = self.inner.http.clone();
119        let token = self.inner.token.clone();
120
121        let sse_handle = tokio::spawn(async move {
122            let _ = stream_sse_with_retry(
123                http,
124                token,
125                base_url,
126                project_id,
127                script_name,
128                tx_inner,
129                None,
130            )
131            .await;
132        });
133        let forward_handle = tokio::spawn(async move {
134            while let Some(evt) = rx_inner.recv().await {
135                if tx_bounded.send(evt).await.is_err() {
136                    break;
137                }
138            }
139        });
140
141        Ok((
142            rx_bounded,
143            EventSubscription {
144                handles: vec![sse_handle, forward_handle],
145            },
146        ))
147    }
148
149    /// Stream execution engine events for a specific script.
150    pub async fn execution_stream(
151        &self,
152        script_name: &str,
153    ) -> Result<(mpsc::UnboundedReceiver<EngineEvent>, EventSubscription)> {
154        let (mut hub_rx, sub) = self.event_stream(Some(script_name)).await?;
155        let (tx, rx) = mpsc::unbounded_channel();
156
157        let outer_handle = tokio::spawn(async move {
158            while let Some(evt) = hub_rx.recv().await {
159                if let HubEvent::Execution { event, .. } = evt {
160                    if tx.send(event).is_err() {
161                        break;
162                    }
163                }
164            }
165        });
166
167        let combined = EventSubscription {
168            handles: vec![tokio::spawn(async move {
169                let _sub = sub;
170                outer_handle.await.ok();
171            })],
172        };
173
174        Ok((rx, combined))
175    }
176
177    /// Stream execution events translated to typed [`WorkflowEvent`]s for a
178    /// specific script (#1239 — mirrors Python `events.typed_engine_events`).
179    ///
180    /// Functionally identical to [`execution_stream`](Self::execution_stream),
181    /// but each event is passed through `WorkflowEvent::from(EngineEvent)`
182    /// before being yielded so consumers can pattern-match on typed
183    /// variants instead of inspecting raw `EngineEvent` payloads. Use
184    /// this when you want the same ergonomics as `RunStream`'s typed
185    /// iterator on a free-standing execution subscription (e.g. attaching
186    /// to a run started by someone else).
187    pub async fn typed_execution_stream(
188        &self,
189        script_name: &str,
190    ) -> Result<(
191        mpsc::UnboundedReceiver<crate::events::WorkflowEvent>,
192        EventSubscription,
193    )> {
194        let (mut raw_rx, sub) = self.execution_stream(script_name).await?;
195        let (tx, rx) = mpsc::unbounded_channel();
196        let outer_handle = tokio::spawn(async move {
197            while let Some(evt) = raw_rx.recv().await {
198                let typed: crate::events::WorkflowEvent = evt.into();
199                if tx.send(typed).is_err() {
200                    break;
201                }
202            }
203        });
204        let combined = EventSubscription {
205            handles: vec![tokio::spawn(async move {
206                let _sub = sub;
207                outer_handle.await.ok();
208            })],
209        };
210        Ok((rx, combined))
211    }
212
213    /// Convenience: call `callback` for every hub event.
214    pub async fn on_events<F>(
215        &self,
216        script_name: Option<&str>,
217        mut callback: F,
218    ) -> Result<EventSubscription>
219    where
220        F: FnMut(HubEvent) + Send + 'static,
221    {
222        let (mut rx, sub) = self.event_stream(script_name).await?;
223        let handle = tokio::spawn(async move {
224            let _sub = sub;
225            while let Some(evt) = rx.recv().await {
226                callback(evt);
227            }
228        });
229        Ok(EventSubscription {
230            handles: vec![handle],
231        })
232    }
233
234    /// Convenience: call `callback` for every execution event on a script.
235    pub async fn on_script_execution<F>(
236        &self,
237        script_name: &str,
238        mut callback: F,
239    ) -> Result<EventSubscription>
240    where
241        F: FnMut(EngineEvent) + Send + 'static,
242    {
243        let (mut rx, sub) = self.execution_stream(script_name).await?;
244        let handle = tokio::spawn(async move {
245            let _sub = sub;
246            while let Some(evt) = rx.recv().await {
247                callback(evt);
248            }
249        });
250        Ok(EventSubscription {
251            handles: vec![handle],
252        })
253    }
254
255    /// Convenience: call `callback` on script version updates.
256    pub async fn on_script_change<F>(
257        &self,
258        script_name: &str,
259        mut callback: F,
260    ) -> Result<EventSubscription>
261    where
262        F: FnMut(i64, Option<String>) + Send + 'static,
263    {
264        let name = script_name.to_string();
265        self.on_events(Some(script_name), move |hub_evt| {
266            if let HubEvent::Registry(RegistryEvent::ScriptUpdated {
267                script_name: ref evt_name,
268                version_id,
269                ref channel,
270                ..
271            }) = hub_evt
272            {
273                if *evt_name == name {
274                    callback(version_id, channel.clone());
275                }
276            }
277        })
278        .await
279    }
280
281    /// Like [`on_script_change`](Self::on_script_change), but also marks the
282    /// script as broken in the contract state so that subsequent `run()` calls
283    /// raise before POSTing (matching the TS and Python SDK behaviour).
284    pub async fn on_script_schema_change<F>(
285        &self,
286        script_name: &str,
287        mut callback: F,
288    ) -> Result<EventSubscription>
289    where
290        F: FnMut(i64, Option<String>) + Send + 'static,
291    {
292        let name = script_name.to_string();
293        let inner = Arc::clone(&self.inner);
294        self.on_events(Some(script_name), move |hub_evt| {
295            if let HubEvent::Registry(RegistryEvent::ScriptUpdated {
296                script_name: ref evt_name,
297                version_id,
298                ref channel,
299                ..
300            }) = hub_evt
301            {
302                if *evt_name == name {
303                    inner.broken_scripts.lock().unwrap().insert(name.clone());
304                    callback(version_id, channel.clone());
305                }
306            }
307        })
308        .await
309    }
310}
311
312/// Build the SSE URL. The bearer token is delivered exclusively via the
313/// `Authorization` header on the inner request (see `stream_sse`); older
314/// revisions also appended `&token=...` here so any future caller that
315/// invoked this URL directly (e.g. an EventSource shim) would still
316/// authenticate, but that put long-lived service-token secrets into
317/// reverse-proxy access logs and OTel `http.url` span attributes.
318async fn build_events_url(base_url: &str, project_id: i64, script_name: Option<&str>) -> String {
319    let mut url = format!("{}/events?project_id={}", base_url, project_id);
320    if let Some(name) = script_name {
321        url.push_str(&format!("&script_name={}", urlencoding::encode(name)));
322    }
323    url
324}
325
326/// Retry wrapper around stream_sse with exponential-with-jitter backoff
327/// (base 1s, cap 30s — same curve as heartbeat per #1182).
328///
329/// If `ready_tx` is `Some`, it fires exactly once:
330/// - `Ok(())` the moment the first `stream_sse` call returns a 2xx HTTP
331///   response (the subscription is live on the server).
332/// - `Err(e)` if all retries are exhausted without ever connecting.
333/// This lets callers block on "SSE subscribed" before issuing a dependent
334/// `POST /run` to avoid the subscribe-after-POST race where opening events
335/// are lost.
336///
337/// Tracks the `Last-Event-ID` cursor across reconnects (#1101): every
338/// successful event updates `last_event_id`, and each retry replays it
339/// as the SSE-spec `Last-Event-ID` request header so the server can
340/// resume from the gap when DB-backed replay lands.
341pub(crate) async fn stream_sse_with_retry(
342    http: reqwest::Client,
343    token: Arc<tokio::sync::RwLock<Option<String>>>,
344    base_url: String,
345    project_id: i64,
346    script_name: Option<String>,
347    tx: mpsc::UnboundedSender<HubEvent>,
348    mut ready_tx: Option<oneshot::Sender<Result<()>>>,
349) -> Result<()> {
350    let max_retries = 5u32;
351    let mut attempt = 0;
352    // Wrapped in an Arc<Mutex<...>> so `stream_sse` can update it in-place
353    // as it consumes events. The retry wrapper reads its current value on
354    // each (re)connect and threads it through the request headers.
355    let last_event_id: Arc<std::sync::Mutex<Option<i64>>> = Arc::new(std::sync::Mutex::new(None));
356    loop {
357        let url = build_events_url(&base_url, project_id, script_name.as_deref()).await;
358        let cursor = *last_event_id.lock().unwrap();
359        match stream_sse(
360            http.clone(),
361            token.clone(),
362            &url,
363            tx.clone(),
364            &mut ready_tx,
365            cursor,
366            Arc::clone(&last_event_id),
367        )
368        .await
369        {
370            Ok(()) => return Ok(()),
371            Err(e) => {
372                attempt += 1;
373                if attempt > max_retries || tx.is_closed() {
374                    if let Some(rt) = ready_tx.take() {
375                        let _ = rt.send(Err(AkribesError::Other(format!(
376                            "SSE subscribe failed after {} attempts: {}",
377                            attempt, e
378                        ))));
379                    }
380                    return Err(e);
381                }
382                let delay = retry_backoff(attempt);
383                tracing::warn!(attempt, max_retries, ?delay, "SSE disconnected, retrying");
384                tokio::time::sleep(delay).await;
385            }
386        }
387    }
388}
389
390/// Open the bench-run SSE stream at `GET /bench-runs/{run_id}/events`
391/// and forward each decoded [`BenchRunEvent`] to `tx`.
392///
393/// Reuses the same byte deframer ([`split_sse_message_bytes`]) and
394/// field parser ([`parse_sse_message`]) as the hub `/events` reader, but
395/// decodes the bench endpoint's named frames (`result` / `lagged` /
396/// `terminal`) instead of the hub's `Vec<HubEvent>` batches.
397///
398/// Single-connection (no retry/backoff): a bench run is short-lived and
399/// the server emits a synthetic `terminal` frame before closing the
400/// stream, so there is no replay cursor to resume from. The function
401/// returns once the HTTP body ends (after the `terminal` frame) or `tx`
402/// is dropped. A non-2xx response surfaces as `Err` to the caller.
403pub(crate) async fn stream_bench_run_events(
404    http: reqwest::Client,
405    token: Arc<tokio::sync::RwLock<Option<String>>>,
406    base_url: String,
407    run_id: i64,
408    tx: mpsc::UnboundedSender<BenchRunEvent>,
409    mut ready_tx: Option<oneshot::Sender<Result<()>>>,
410) -> Result<()> {
411    let url = format!("{}/bench-runs/{}/events", base_url, run_id);
412    let mut req = http.get(&url).header("Accept", "text/event-stream");
413    if let Some(ref t) = *token.read().await {
414        req = req.bearer_auth(t);
415    }
416    let res = match req.send().await.map_err(AkribesError::Http) {
417        Ok(r) => r,
418        Err(e) => {
419            if let Some(rt) = ready_tx.take() {
420                let _ = rt.send(Err(AkribesError::Other(format!(
421                    "bench SSE subscribe failed: {e}"
422                ))));
423            }
424            return Err(e);
425        }
426    };
427    if !res.status().is_success() {
428        let status = res.status().as_u16();
429        let err = AkribesError::HttpStatus {
430            status,
431            message: format!("bench SSE subscribe failed: {}", res.status()),
432        };
433        if let Some(rt) = ready_tx.take() {
434            let _ = rt.send(Err(AkribesError::HttpStatus {
435                status,
436                message: format!("bench SSE subscribe failed: {}", res.status()),
437            }));
438        }
439        return Err(err);
440    }
441    if let Some(rt) = ready_tx.take() {
442        let _ = rt.send(Ok(()));
443    }
444
445    let mut stream = res.bytes_stream();
446    let mut buf: Vec<u8> = Vec::new();
447    while let Some(chunk) = stream.next().await {
448        let chunk = chunk.map_err(AkribesError::Http)?;
449        buf.extend_from_slice(&chunk);
450        while let Some((msg_bytes, delim_len)) = split_sse_message_bytes(&buf) {
451            let message = String::from_utf8_lossy(&buf[..msg_bytes]).into_owned();
452            buf.drain(..msg_bytes + delim_len);
453            let Some(frame) = parse_sse_message(&message) else {
454                continue;
455            };
456            match frame.event_type.as_str() {
457                "result" => match serde_json::from_str::<BenchResult>(&frame.data) {
458                    Ok(r) => {
459                        if tx.send(BenchRunEvent::Result(Box::new(r))).is_err() {
460                            return Ok(());
461                        }
462                    }
463                    Err(e) => {
464                        tracing::warn!(error = %e, "bench SSE result parse error");
465                    }
466                },
467                "lagged" => {
468                    let dropped = serde_json::from_str::<serde_json::Value>(&frame.data)
469                        .ok()
470                        .and_then(|v| v.get("dropped").and_then(|d| d.as_u64()))
471                        .unwrap_or(0);
472                    if tx.send(BenchRunEvent::Lagged { dropped }).is_err() {
473                        return Ok(());
474                    }
475                }
476                "terminal" => {
477                    let status = serde_json::from_str::<serde_json::Value>(&frame.data)
478                        .ok()
479                        .and_then(|v| {
480                            v.get("status")
481                                .and_then(|s| s.as_str())
482                                .map(|s| s.to_string())
483                        })
484                        .unwrap_or_else(|| "unknown".to_string());
485                    // Best-effort: deliver the terminal marker, then stop.
486                    let _ = tx.send(BenchRunEvent::Terminal { status });
487                    return Ok(());
488                }
489                other => {
490                    tracing::warn!(event_type = other, "ignoring unknown bench SSE event type");
491                }
492            }
493        }
494    }
495    Ok(())
496}
497
498/// SDK-wide canonical SSE/heartbeat backoff curve (#1182):
499/// exponential with full jitter, base 1s, cap 30s.
500fn retry_backoff(attempt: u32) -> std::time::Duration {
501    if attempt == 0 {
502        return std::time::Duration::ZERO;
503    }
504    let base_ms: u64 = 1_000;
505    let cap_ms: u64 = 30_000;
506    let exponent = attempt.saturating_sub(1).min(20);
507    let exp_ms = base_ms.saturating_mul(1u64 << exponent).min(cap_ms);
508    let now_nanos = std::time::SystemTime::now()
509        .duration_since(std::time::UNIX_EPOCH)
510        .map(|d| d.subsec_nanos() as u64)
511        .unwrap_or(0);
512    let jitter_ms = if exp_ms == 0 { 0 } else { now_nanos % exp_ms };
513    std::time::Duration::from_millis(jitter_ms)
514}
515
516/// Parse an SSE byte stream and send deserialized events to the channel.
517///
518/// `ready_tx` (if present) fires once the server returns a 2xx response,
519/// indicating the subscription is active. Non-2xx and transport errors
520/// return `Err` without firing; the retry wrapper is responsible for
521/// deciding whether to fire the signal after retries are exhausted.
522///
523/// `cursor` (when `Some`) is sent as the `Last-Event-ID` header on the
524/// request — the SSE-spec mechanism for resuming after a transport drop.
525/// `last_event_id_out` is updated in-place as `id:` lines arrive, so the
526/// retry wrapper has the latest cursor for the *next* attempt.
527async fn stream_sse(
528    http: reqwest::Client,
529    token: Arc<tokio::sync::RwLock<Option<String>>>,
530    url: &str,
531    tx: mpsc::UnboundedSender<HubEvent>,
532    ready_tx: &mut Option<oneshot::Sender<Result<()>>>,
533    cursor: Option<i64>,
534    last_event_id_out: Arc<std::sync::Mutex<Option<i64>>>,
535) -> Result<()> {
536    let mut req = http.get(url).header("Accept", "text/event-stream");
537    if let Some(ref t) = *token.read().await {
538        req = req.bearer_auth(t);
539    }
540    if let Some(seq) = cursor {
541        req = req.header("Last-Event-ID", seq.to_string());
542    }
543    let res = req.send().await.map_err(AkribesError::Http)?;
544    if !res.status().is_success() {
545        return Err(AkribesError::HttpStatus {
546            status: res.status().as_u16(),
547            message: format!("SSE subscribe failed: {}", res.status()),
548        });
549    }
550    if let Some(rt) = ready_tx.take() {
551        let _ = rt.send(Ok(()));
552    }
553    let mut stream = res.bytes_stream();
554    // Buffer raw bytes — reqwest's bytes_stream() yields arbitrary chunks
555    // that do NOT respect UTF-8 character boundaries, so we can only decode
556    // a complete SSE message once we have its delimiter.
557    let mut buf: Vec<u8> = Vec::new();
558
559    while let Some(chunk) = stream.next().await {
560        let chunk = chunk.map_err(AkribesError::Http)?;
561        buf.extend_from_slice(&chunk);
562
563        // Process complete SSE messages. Support \n\n, \r\n\r\n, \r\r per spec.
564        while let Some((msg_bytes, delim_len)) = split_sse_message_bytes(&buf) {
565            // Decode just the completed message as UTF-8. If the server sent
566            // invalid bytes inside a message, replace them lossily rather
567            // than tearing down the stream.
568            let message = String::from_utf8_lossy(&buf[..msg_bytes]).into_owned();
569            buf.drain(..msg_bytes + delim_len);
570
571            let Some(frame) = parse_sse_message(&message) else {
572                continue;
573            };
574            let SseFrame {
575                event_type,
576                data,
577                event_id,
578            } = frame;
579            // Persist the cursor so the retry wrapper sees the latest
580            // `seq` we received before any subsequent disconnect.
581            if let Some(seq) = event_id {
582                *last_event_id_out.lock().unwrap() = Some(seq);
583            }
584
585            if event_type == "batch" || event_type.is_empty() {
586                // Decode the batch element-by-element rather than as a single
587                // `Vec<HubEvent>`. A batch frequently mixes event kinds (e.g.
588                // co-occurring `Execution` and `Bench` frames), and the hub
589                // can introduce new `type` discriminants the SDK predates. A
590                // monolithic `Vec<HubEvent>` decode aborts the WHOLE batch on
591                // the first unrecognised `type`, silently dropping the known
592                // events alongside it. Per-element decode skips only the
593                // element we can't model and still delivers the rest.
594                match serde_json::from_str::<Vec<serde_json::Value>>(&data) {
595                    Ok(raw_events) => {
596                        for raw in raw_events {
597                            match serde_json::from_value::<HubEvent>(raw) {
598                                Ok(evt) => {
599                                    if tx.send(evt).is_err() {
600                                        return Ok(());
601                                    }
602                                }
603                                Err(e) => {
604                                    tracing::warn!(
605                                        error = %e,
606                                        "skipping unrecognised hub event in batch"
607                                    );
608                                }
609                            }
610                        }
611                    }
612                    Err(e) => {
613                        tracing::warn!(error = %e, "SSE JSON parse error");
614                    }
615                }
616            } else {
617                tracing::warn!(event_type, "ignoring unknown SSE event type");
618            }
619        }
620    }
621
622    Ok(())
623}
624
625/// One decoded SSE frame: the `event:` type (empty when omitted), the
626/// joined `data:` payload, and the optional numeric `id:` cursor. Shared
627/// shape so multiple endpoint readers (the hub `/events` stream and the
628/// bench `/bench-runs/{id}/events` stream) can reuse the same byte
629/// deframer + line parser without each re-implementing SSE framing.
630pub(crate) struct SseFrame {
631    pub event_type: String,
632    pub data: String,
633    pub event_id: Option<i64>,
634}
635
636/// Parse one complete SSE message (already deframed by
637/// [`split_sse_message_bytes`]) into its `event:` / `data:` / `id:`
638/// fields. Returns `None` when the message carried no `data:` line (a
639/// bare keepalive or comment), which callers skip. Multiple `data:`
640/// lines are joined with `\n` per the SSE spec.
641pub(crate) fn parse_sse_message(message: &str) -> Option<SseFrame> {
642    let mut data_parts: Vec<&str> = Vec::new();
643    let mut event_type = String::new();
644    let mut event_id: Option<i64> = None;
645    for line in message.lines() {
646        if let Some(rest) = line.strip_prefix("data: ") {
647            data_parts.push(rest);
648        } else if let Some(rest) = line.strip_prefix("data:") {
649            data_parts.push(rest);
650        } else if let Some(rest) = line.strip_prefix("event: ") {
651            event_type = rest.to_string();
652        } else if let Some(rest) = line.strip_prefix("event:") {
653            event_type = rest.to_string();
654        } else if let Some(rest) = line.strip_prefix("id: ") {
655            event_id = rest.parse::<i64>().ok();
656        } else if let Some(rest) = line.strip_prefix("id:") {
657            event_id = rest.parse::<i64>().ok();
658        }
659    }
660    if data_parts.is_empty() {
661        return None;
662    }
663    Some(SseFrame {
664        event_type,
665        data: data_parts.join("\n"),
666        event_id,
667    })
668}
669
670/// Find the first complete SSE message in the byte buffer.
671/// Returns `Some((message_len, delimiter_len))` or `None` if no complete
672/// message yet. The caller should take the first `message_len` bytes and
673/// then drain `message_len + delimiter_len` bytes from the buffer.
674///
675/// Per the SSE spec, `\n\n`, `\r\n\r\n`, and `\r\r` are all valid blank-line
676/// delimiters; mixed conventions can appear in the same stream when an
677/// intermediary rewrites line endings. We must pick the EARLIEST delimiter
678/// in the buffer — not the first one a fixed-order scan happens to find —
679/// or two interleaved events would be merged into one and parsed as a
680/// single (malformed) message. Mirrors the TS SDK's `findSseDelimiter`.
681pub(crate) fn split_sse_message_bytes(buf: &[u8]) -> Option<(usize, usize)> {
682    let mut best: Option<(usize, usize)> = None;
683    for delimiter in &[
684        b"\r\n\r\n".as_slice(),
685        b"\n\n".as_slice(),
686        b"\r\r".as_slice(),
687    ] {
688        if let Some(pos) = find_bytes(buf, delimiter) {
689            match best {
690                Some((best_pos, _)) if pos >= best_pos => {}
691                _ => best = Some((pos, delimiter.len())),
692            }
693        }
694    }
695    best
696}
697
698fn find_bytes(haystack: &[u8], needle: &[u8]) -> Option<usize> {
699    if needle.is_empty() || haystack.len() < needle.len() {
700        return None;
701    }
702    haystack.windows(needle.len()).position(|w| w == needle)
703}
704
705#[cfg(test)]
706mod sse_split_tests {
707    use super::split_sse_message_bytes;
708
709    #[test]
710    fn picks_lf_lf_when_alone() {
711        let buf = b"event: ping\ndata: 1\n\nrest";
712        let (msg_len, delim_len) = split_sse_message_bytes(buf).expect("delim found");
713        assert_eq!(&buf[..msg_len], b"event: ping\ndata: 1");
714        assert_eq!(delim_len, 2);
715    }
716
717    #[test]
718    fn picks_crlf_crlf_when_alone() {
719        let buf = b"event: ping\r\ndata: 1\r\n\r\nrest";
720        let (msg_len, delim_len) = split_sse_message_bytes(buf).expect("delim found");
721        assert_eq!(&buf[..msg_len], b"event: ping\r\ndata: 1");
722        assert_eq!(delim_len, 4);
723    }
724
725    #[test]
726    fn picks_earliest_delimiter_when_mixed() {
727        // Earlier event uses LF/LF; later one uses CRLF/CRLF. The split
728        // must land on the EARLIER `\n\n` or the two events get merged
729        // into one message and the second falls into the "data:" parse
730        // path of the first. Pre-fix this returned the CRLF position.
731        let buf = b"data: a\n\ndata: b\r\n\r\n";
732        let (msg_len, delim_len) = split_sse_message_bytes(buf).expect("delim found");
733        assert_eq!(&buf[..msg_len], b"data: a");
734        assert_eq!(delim_len, 2);
735    }
736
737    #[test]
738    fn picks_earliest_delimiter_crlf_first() {
739        // Reverse case: CRLF-terminated event first, LF-terminated after.
740        let buf = b"data: a\r\n\r\ndata: b\n\n";
741        let (msg_len, delim_len) = split_sse_message_bytes(buf).expect("delim found");
742        assert_eq!(&buf[..msg_len], b"data: a");
743        assert_eq!(delim_len, 4);
744    }
745
746    #[test]
747    fn returns_none_without_delimiter() {
748        let buf = b"data: incomplete";
749        assert!(split_sse_message_bytes(buf).is_none());
750    }
751}