Skip to main content

faucet_source_redis/
stream.rs

1//! Redis source stream executor.
2
3use crate::config::{RedisSourceConfig, RedisSourceType};
4use async_trait::async_trait;
5use faucet_core::{FaucetError, Stream, StreamPage};
6use redis::AsyncCommands;
7use serde_json::{Value, json};
8use std::pin::Pin;
9
10/// A configured Redis source that reads records from Redis data structures.
11pub struct RedisSource {
12    config: RedisSourceConfig,
13    /// Lazily-opened multiplexed connection, reused across every `fetch_all`
14    /// and `stream_pages` call instead of opening a fresh client + TCP/AUTH
15    /// handshake per call (#78/#22). `MultiplexedConnection` is cheap to clone
16    /// (it shares one underlying socket), so each call clones the cached one.
17    conn: tokio::sync::OnceCell<redis::aio::MultiplexedConnection>,
18}
19
20impl RedisSource {
21    /// Create a new Redis source from the given configuration. The connection
22    /// is opened lazily on first use, so construction stays synchronous and does
23    /// no I/O; it fails only on an invalid config (an out-of-range `batch_size`).
24    pub fn new(config: RedisSourceConfig) -> Result<Self, FaucetError> {
25        faucet_core::validate_batch_size(config.batch_size)?;
26        Ok(Self {
27            config,
28            conn: tokio::sync::OnceCell::new(),
29        })
30    }
31
32    /// Return a clone of the shared multiplexed connection, opening it once on
33    /// first call.
34    async fn connection(&self) -> Result<redis::aio::MultiplexedConnection, FaucetError> {
35        let conn = self
36            .conn
37            .get_or_try_init(|| async {
38                let client = redis::Client::open(self.config.url.as_str())
39                    .map_err(|e| FaucetError::Config(format!("invalid Redis URL: {e}")))?;
40                client
41                    .get_multiplexed_async_connection()
42                    .await
43                    .map_err(|e| FaucetError::Source(format!("Redis connection failed: {e}")))
44            })
45            .await?;
46        Ok(conn.clone())
47    }
48
49    /// Fetch all records from the configured Redis source.
50    pub async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
51        let mut conn = self.connection().await?;
52
53        let mut records = match &self.config.source_type {
54            RedisSourceType::List { key } => self.fetch_list(&mut conn, key).await?,
55            RedisSourceType::Stream {
56                key,
57                group,
58                consumer,
59                count,
60            } => {
61                self.fetch_stream(&mut conn, key, group, consumer, count)
62                    .await?
63            }
64            RedisSourceType::Keys { pattern } => self.fetch_keys(&mut conn, pattern).await?,
65        };
66
67        if let Some(max) = self.config.max_records {
68            records.truncate(max);
69        }
70
71        tracing::info!(records = records.len(), "Redis fetch complete");
72        Ok(records)
73    }
74
75    /// Read all elements from a Redis list.
76    async fn fetch_list(
77        &self,
78        conn: &mut redis::aio::MultiplexedConnection,
79        key: &str,
80    ) -> Result<Vec<Value>, FaucetError> {
81        let values: Vec<String> = conn
82            .lrange(key, 0, -1)
83            .await
84            .map_err(|e| FaucetError::Source(format!("LRANGE failed on '{key}': {e}")))?;
85
86        let records = values
87            .into_iter()
88            .map(|v| serde_json::from_str::<Value>(&v).unwrap_or_else(|_| Value::String(v.clone())))
89            .collect();
90
91        Ok(records)
92    }
93
94    /// Read entries from a Redis stream.
95    async fn fetch_stream(
96        &self,
97        conn: &mut redis::aio::MultiplexedConnection,
98        key: &str,
99        group: &Option<String>,
100        consumer: &Option<String>,
101        count: &Option<usize>,
102    ) -> Result<Vec<Value>, FaucetError> {
103        let mut records = Vec::new();
104        match (group, consumer) {
105            (Some(group_name), Some(consumer_name)) => {
106                // Drain ALL currently-pending new messages for the group, not just
107                // the first `count`. A single XREADGROUP with the default
108                // `count = 100` silently truncated the rest (#146 narrowed); loop,
109                // consuming `>` until a short read (fewer than requested → the
110                // backlog is drained) or the `max_records` cap is hit.
111                let per_read = count.unwrap_or(100).max(1);
112                loop {
113                    let opts = redis::streams::StreamReadOptions::default().count(per_read);
114                    let reply: redis::streams::StreamReadReply = conn
115                        .xread_options(&[key], &[">"], &opts.group(group_name, consumer_name))
116                        .await
117                        .map_err(|e| {
118                            FaucetError::Source(format!("XREADGROUP failed on '{key}': {e}"))
119                        })?;
120                    let mut got = 0usize;
121                    for stream_key in &reply.keys {
122                        for entry in &stream_key.ids {
123                            records.push(stream_entry_to_json(&entry.id, &entry.map));
124                            got += 1;
125                        }
126                    }
127                    // A short read means Redis returned everything currently
128                    // pending — stop (also breaks at `got == 0`). This also avoids
129                    // spinning against a live producer.
130                    if got < per_read {
131                        break;
132                    }
133                    if let Some(max) = self.config.max_records
134                        && records.len() >= max
135                    {
136                        break;
137                    }
138                }
139            }
140            _ => {
141                // No consumer group: XREAD from `0` returns the whole stream when
142                // no `count` is set; an explicit `count` is the caller's own cap.
143                let mut opts = redis::streams::StreamReadOptions::default();
144                if let Some(c) = count {
145                    opts = opts.count(*c);
146                }
147                let reply: redis::streams::StreamReadReply = conn
148                    .xread_options(&[key], &["0"], &opts)
149                    .await
150                    .map_err(|e| FaucetError::Source(format!("XREAD failed on '{key}': {e}")))?;
151                for stream_key in &reply.keys {
152                    for entry in &stream_key.ids {
153                        records.push(stream_entry_to_json(&entry.id, &entry.map));
154                    }
155                }
156            }
157        }
158
159        Ok(records)
160    }
161
162    /// Scan for keys matching a pattern, then MGET all keys in a single round-trip.
163    async fn fetch_keys(
164        &self,
165        conn: &mut redis::aio::MultiplexedConnection,
166        pattern: &str,
167    ) -> Result<Vec<Value>, FaucetError> {
168        let keys: Vec<String> = {
169            let mut collected = Vec::new();
170            let mut iter: redis::AsyncIter<String> =
171                conn.scan_match(pattern).await.map_err(|e| {
172                    FaucetError::Source(format!("SCAN failed with pattern '{pattern}': {e}"))
173                })?;
174
175            while let Some(key) = iter.next_item().await {
176                collected.push(key);
177            }
178            collected
179        };
180
181        if keys.is_empty() {
182            return Ok(Vec::new());
183        }
184
185        let values: Vec<Option<String>> = redis::cmd("MGET")
186            .arg(&keys)
187            .query_async(conn)
188            .await
189            .map_err(|e| FaucetError::Source(format!("MGET failed: {e}")))?;
190
191        let mut records = Vec::new();
192        for (key, value) in keys.iter().zip(values.into_iter()) {
193            if let Some(v) = value {
194                let parsed =
195                    serde_json::from_str::<Value>(&v).unwrap_or_else(|_| Value::String(v.clone()));
196                records.push(json!({
197                    "key": key,
198                    "value": parsed,
199                }));
200            }
201        }
202
203        Ok(records)
204    }
205}
206
207/// Convert a single XRANGE/XREAD stream entry into the JSON record shape used
208/// by both [`RedisSource::fetch_all`] and [`RedisSource::stream_pages`].
209fn stream_entry_to_json(id: &str, map: &std::collections::HashMap<String, redis::Value>) -> Value {
210    let mut fields = serde_json::Map::new();
211    for (field_name, field_value) in map {
212        let val = match field_value {
213            redis::Value::BulkString(bytes) => {
214                let s = String::from_utf8_lossy(bytes);
215                serde_json::from_str::<Value>(&s).unwrap_or_else(|_| Value::String(s.into_owned()))
216            }
217            redis::Value::SimpleString(s) => {
218                serde_json::from_str::<Value>(s).unwrap_or_else(|_| Value::String(s.clone()))
219            }
220            redis::Value::Int(n) => json!(n),
221            redis::Value::Double(n) => json!(n),
222            redis::Value::Boolean(b) => json!(b),
223            redis::Value::Nil => Value::Null,
224            other => Value::String(format!("{other:?}")),
225        };
226        fields.insert(field_name.clone(), val);
227    }
228    json!({
229        "id": id,
230        "fields": Value::Object(fields),
231    })
232}
233
234/// Parse a Redis stream entry ID (`ms-seq`) and return the immediate
235/// successor ID, used to advance the `start` argument of the next `XRANGE`
236/// call without re-emitting the last entry of the previous page.
237fn next_stream_id(id: &str) -> String {
238    // Stream IDs are `<ms>-<seq>`. The "next" ID after `a-b` is `a-(b+1)`,
239    // wrapping to `(a+1)-0` on `u64::MAX` (which we treat as terminal).
240    if let Some((ms, seq)) = id.split_once('-')
241        && let (Ok(ms), Ok(seq)) = (ms.parse::<u64>(), seq.parse::<u64>())
242    {
243        return match seq.checked_add(1) {
244            Some(next_seq) => format!("{ms}-{next_seq}"),
245            None => format!("{}-0", ms.saturating_add(1)),
246        };
247    }
248    // Fall back to appending `\x00` — XRANGE treats this as "just after".
249    // Reachable only if Redis ever returns a malformed ID, which it does not
250    // in practice, but we degrade safely.
251    format!("{id}\u{0}")
252}
253
254#[async_trait]
255impl faucet_core::Source for RedisSource {
256    async fn fetch_with_context(
257        &self,
258        context: &std::collections::HashMap<String, serde_json::Value>,
259    ) -> Result<Vec<Value>, FaucetError> {
260        if context.is_empty() {
261            return RedisSource::fetch_all(self).await;
262        }
263
264        let mut conn = self.connection().await?;
265
266        // Substitute context into the key/pattern of each source type variant.
267        let mut records = match &self.config.source_type {
268            RedisSourceType::List { key } => {
269                let resolved_key = faucet_core::util::substitute_context(key, context);
270                self.fetch_list(&mut conn, &resolved_key).await?
271            }
272            RedisSourceType::Stream {
273                key,
274                group,
275                consumer,
276                count,
277            } => {
278                let resolved_key = faucet_core::util::substitute_context(key, context);
279                self.fetch_stream(&mut conn, &resolved_key, group, consumer, count)
280                    .await?
281            }
282            RedisSourceType::Keys { pattern } => {
283                let resolved_pattern = faucet_core::util::substitute_context(pattern, context);
284                self.fetch_keys(&mut conn, &resolved_pattern).await?
285            }
286        };
287
288        if let Some(max) = self.config.max_records {
289            records.truncate(max);
290        }
291
292        tracing::info!(
293            records = records.len(),
294            "Redis fetch complete (with context)"
295        );
296        Ok(records)
297    }
298
299    /// Stream records page-by-page so the pipeline can write to the sink as
300    /// pages arrive instead of buffering the full result set. Each mode maps
301    /// [`RedisSourceConfig::batch_size`] onto its native paging primitive
302    /// (see the type-level doc on [`RedisSourceConfig::batch_size`]).
303    ///
304    /// The trait-level `batch_size` argument is ignored in favour of the
305    /// config field — the config is the user-facing knob the README
306    /// documents, and routing the pipeline-supplied hint through it would
307    /// silently override an explicit config value.
308    ///
309    /// `batch_size = 0` drains the underlying primitive into a single page.
310    /// The Redis source has no incremental-replication mode today, so every
311    /// emitted page carries `bookmark: None`.
312    fn stream_pages<'a>(
313        &'a self,
314        context: &'a std::collections::HashMap<String, Value>,
315        _batch_size: usize,
316    ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
317        let batch_size = self.config.batch_size;
318        let max_records = self.config.max_records;
319
320        Box::pin(async_stream::try_stream! {
321            let mut conn = self.connection().await?;
322
323            let mut emitted: usize = 0;
324
325            match &self.config.source_type {
326                RedisSourceType::List { key } => {
327                    let resolved = if context.is_empty() {
328                        key.clone()
329                    } else {
330                        faucet_core::util::substitute_context(key, context)
331                    };
332                    let pages = stream_list(&mut conn, &resolved, batch_size, max_records);
333                    futures::pin_mut!(pages);
334                    while let Some(page) = futures::StreamExt::next(&mut pages).await {
335                        let page = page?;
336                        emitted += page.records.len();
337                        yield page;
338                    }
339                }
340                RedisSourceType::Stream { key, .. } => {
341                    // Streaming intentionally uses XRANGE — consumer-group
342                    // semantics (XREADGROUP) don't compose with "drain to a
343                    // bookmarked checkpoint" because acknowledgement state
344                    // would have to be deferred until the sink succeeds, and
345                    // the source has no incremental mode today.
346                    let resolved = if context.is_empty() {
347                        key.clone()
348                    } else {
349                        faucet_core::util::substitute_context(key, context)
350                    };
351                    let pages = stream_xrange(&mut conn, &resolved, batch_size, max_records);
352                    futures::pin_mut!(pages);
353                    while let Some(page) = futures::StreamExt::next(&mut pages).await {
354                        let page = page?;
355                        emitted += page.records.len();
356                        yield page;
357                    }
358                }
359                RedisSourceType::Keys { pattern } => {
360                    let resolved = if context.is_empty() {
361                        pattern.clone()
362                    } else {
363                        faucet_core::util::substitute_context(pattern, context)
364                    };
365                    let pages = stream_keys(&mut conn, &resolved, batch_size, max_records);
366                    futures::pin_mut!(pages);
367                    while let Some(page) = futures::StreamExt::next(&mut pages).await {
368                        let page = page?;
369                        emitted += page.records.len();
370                        yield page;
371                    }
372                }
373            }
374
375            tracing::info!(
376                records = emitted,
377                batch_size,
378                "Redis source stream complete",
379            );
380        })
381    }
382
383    fn config_schema(&self) -> serde_json::Value {
384        serde_json::to_value(faucet_core::schema_for!(RedisSourceConfig))
385            .expect("schema serialization")
386    }
387}
388
389/// Stream a Redis list via `LRANGE start stop`, sliding the window by
390/// `batch_size`. With `batch_size == 0`, drains the list in a single
391/// `LRANGE 0 -1` round-trip.
392///
393/// **Consistency caveat (#78 LOW):** index-based `LRANGE` paging is only
394/// stable if the list is not mutated mid-scan. A concurrent `LPUSH` / `LPOP`
395/// shifts every element's index, so a writer pushing/popping while this drains
396/// can make the source skip or duplicate elements across page boundaries. For
397/// a queue-style workload where the list is being consumed concurrently,
398/// prefer a Redis Stream (`XRANGE`/consumer groups) over a list.
399fn stream_list<'a>(
400    conn: &'a mut redis::aio::MultiplexedConnection,
401    key: &'a str,
402    batch_size: usize,
403    max_records: Option<usize>,
404) -> impl Stream<Item = Result<StreamPage, FaucetError>> + 'a {
405    async_stream::try_stream! {
406        if batch_size == 0 {
407            let values: Vec<String> = conn
408                .lrange(key, 0, -1)
409                .await
410                .map_err(|e| FaucetError::Source(format!("LRANGE failed on '{key}': {e}")))?;
411            let mut records: Vec<Value> = values
412                .into_iter()
413                .map(|v| serde_json::from_str::<Value>(&v).unwrap_or_else(|_| Value::String(v.clone())))
414                .collect();
415            if let Some(max) = max_records {
416                records.truncate(max);
417            }
418            yield StreamPage { records, bookmark: None };
419            return;
420        }
421
422        let mut start: isize = 0;
423        let mut emitted: usize = 0;
424        loop {
425            let stop: isize = start + batch_size as isize - 1;
426            let values: Vec<String> = conn
427                .lrange(key, start, stop)
428                .await
429                .map_err(|e| FaucetError::Source(format!("LRANGE failed on '{key}': {e}")))?;
430            if values.is_empty() {
431                break;
432            }
433            let mut records: Vec<Value> = values
434                .into_iter()
435                .map(|v| serde_json::from_str::<Value>(&v).unwrap_or_else(|_| Value::String(v.clone())))
436                .collect();
437            let returned = records.len();
438            // Respect max_records — truncate the final page and stop.
439            let mut stop_after_yield = false;
440            if let Some(max) = max_records
441                && emitted + records.len() >= max
442            {
443                records.truncate(max - emitted);
444                stop_after_yield = true;
445            }
446            emitted += records.len();
447            yield StreamPage { records, bookmark: None };
448            if stop_after_yield || returned < batch_size {
449                break;
450            }
451            start += batch_size as isize;
452        }
453    }
454}
455
456/// Stream a Redis stream via `XRANGE start + COUNT batch_size`, advancing the
457/// start ID on each page. With `batch_size == 0`, drains via a single
458/// `XRANGE - +` round-trip.
459fn stream_xrange<'a>(
460    conn: &'a mut redis::aio::MultiplexedConnection,
461    key: &'a str,
462    batch_size: usize,
463    max_records: Option<usize>,
464) -> impl Stream<Item = Result<StreamPage, FaucetError>> + 'a {
465    async_stream::try_stream! {
466        if batch_size == 0 {
467            let reply: redis::streams::StreamRangeReply = conn
468                .xrange_all(key)
469                .await
470                .map_err(|e| FaucetError::Source(format!("XRANGE failed on '{key}': {e}")))?;
471            let mut records: Vec<Value> = reply
472                .ids
473                .iter()
474                .map(|entry| stream_entry_to_json(&entry.id, &entry.map))
475                .collect();
476            if let Some(max) = max_records {
477                records.truncate(max);
478            }
479            yield StreamPage { records, bookmark: None };
480            return;
481        }
482
483        let mut start: String = "-".to_string();
484        let mut emitted: usize = 0;
485        loop {
486            let reply: redis::streams::StreamRangeReply = conn
487                .xrange_count(key, &start, "+", batch_size)
488                .await
489                .map_err(|e| FaucetError::Source(format!("XRANGE failed on '{key}': {e}")))?;
490
491            if reply.ids.is_empty() {
492                break;
493            }
494
495            // Capture the last returned ID before consuming the reply so we
496            // can advance the cursor (`next_stream_id`) without re-emitting it.
497            let last_id = reply
498                .ids
499                .last()
500                .expect("non-empty checked above")
501                .id
502                .clone();
503            let returned = reply.ids.len();
504            let mut records: Vec<Value> = reply
505                .ids
506                .into_iter()
507                .map(|entry| stream_entry_to_json(&entry.id, &entry.map))
508                .collect();
509
510            let mut stop_after_yield = false;
511            if let Some(max) = max_records
512                && emitted + records.len() >= max
513            {
514                records.truncate(max - emitted);
515                stop_after_yield = true;
516            }
517            emitted += records.len();
518            yield StreamPage { records, bookmark: None };
519
520            if stop_after_yield || returned < batch_size {
521                break;
522            }
523            start = next_stream_id(&last_id);
524        }
525    }
526}
527
528/// Stream keys matching `pattern`. The `SCAN` cursor is iterated server-side
529/// (with `COUNT` set to a sensible hint), keys are buffered up to
530/// `batch_size`, then `MGET`'d in one round-trip per page. With
531/// `batch_size == 0`, drains the entire scan and emits one page after a
532/// single `MGET`.
533fn stream_keys<'a>(
534    conn: &'a mut redis::aio::MultiplexedConnection,
535    pattern: &'a str,
536    batch_size: usize,
537    max_records: Option<usize>,
538) -> impl Stream<Item = Result<StreamPage, FaucetError>> + 'a {
539    use faucet_core::DEFAULT_BATCH_SIZE;
540    async_stream::try_stream! {
541        // Drive the SCAN cursor manually (one `SCAN cursor MATCH .. COUNT ..`
542        // round-trip at a time) rather than via the buffering `AsyncIter`, so
543        // we can MGET + yield a page as soon as `batch_size` keys accumulate
544        // instead of materialising the entire matched keyset first (#78 LOW).
545        // SCAN COUNT is only a per-round-trip hint; a call may return more or
546        // fewer keys than the hint, so we still buffer until a full page.
547        let scan_hint = if batch_size == 0 { DEFAULT_BATCH_SIZE } else { batch_size };
548        // `batch_size == 0` is the "no batching" sentinel — accumulate the
549        // whole scan and emit one page (still one MGET).
550        let chunk_size = if batch_size == 0 { usize::MAX } else { batch_size };
551        let cap = max_records.unwrap_or(usize::MAX);
552
553        let mut cursor: u64 = 0;
554        let mut buffer: Vec<String> = Vec::new();
555        let mut emitted: usize = 0;
556
557        'scan: loop {
558            let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
559                .arg(cursor)
560                .arg("MATCH")
561                .arg(pattern)
562                .arg("COUNT")
563                .arg(scan_hint)
564                .query_async(conn)
565                .await
566                .map_err(|e| FaucetError::Source(format!("SCAN failed with pattern '{pattern}': {e}")))?;
567            cursor = next_cursor;
568            buffer.extend(keys);
569
570            // Flush as many full pages as the buffer now holds.
571            while emitted < cap && buffer.len() >= chunk_size {
572                let take = chunk_size.min(cap - emitted);
573                let page_keys: Vec<String> = buffer.drain(..take).collect();
574                let records = mget_records(conn, &page_keys).await?;
575                emitted += records.len();
576                yield StreamPage { records, bookmark: None };
577            }
578
579            if cursor == 0 || emitted >= cap {
580                break 'scan;
581            }
582        }
583
584        // Trailing partial page (and the single page in the batch_size==0 case).
585        if emitted < cap && !buffer.is_empty() {
586            let take = (cap - emitted).min(buffer.len());
587            let page_keys: Vec<String> = buffer.drain(..take).collect();
588            let records = mget_records(conn, &page_keys).await?;
589            yield StreamPage { records, bookmark: None };
590        }
591    }
592}
593
594/// `MGET` a slice of keys and pair them with their values via
595/// [`collect_kv_records`].
596async fn mget_records(
597    conn: &mut redis::aio::MultiplexedConnection,
598    keys: &[String],
599) -> Result<Vec<Value>, FaucetError> {
600    let values: Vec<Option<String>> = redis::cmd("MGET")
601        .arg(keys)
602        .query_async(conn)
603        .await
604        .map_err(|e| FaucetError::Source(format!("MGET failed: {e}")))?;
605    Ok(collect_kv_records(keys, values))
606}
607
608/// Pair `keys` with their `MGET`-returned values into `{ "key", "value" }`
609/// records. Missing values (deleted between `SCAN` and `MGET`) are dropped,
610/// matching [`RedisSource::fetch_keys`].
611fn collect_kv_records(keys: &[String], values: Vec<Option<String>>) -> Vec<Value> {
612    keys.iter()
613        .zip(values)
614        .filter_map(|(key, value)| {
615            value.map(|v| {
616                let parsed =
617                    serde_json::from_str::<Value>(&v).unwrap_or_else(|_| Value::String(v.clone()));
618                json!({ "key": key, "value": parsed })
619            })
620        })
621        .collect()
622}
623
624#[cfg(test)]
625mod tests {
626    use super::*;
627    use crate::config::RedisSourceConfig;
628
629    #[test]
630    fn creates_source() {
631        let config = RedisSourceConfig::new(
632            "redis://localhost",
633            RedisSourceType::List { key: "test".into() },
634        );
635        let _source = RedisSource::new(config).unwrap();
636    }
637
638    #[test]
639    fn new_rejects_out_of_range_batch_size() {
640        let mut config = RedisSourceConfig::new(
641            "redis://localhost",
642            RedisSourceType::List { key: "test".into() },
643        );
644        config.batch_size = faucet_core::MAX_BATCH_SIZE + 1;
645        match RedisSource::new(config) {
646            Err(FaucetError::Config(m)) => assert!(m.contains("batch_size"), "got: {m}"),
647            other => panic!(
648                "expected a batch_size Config error, got {:?}",
649                other.is_ok()
650            ),
651        }
652    }
653
654    #[test]
655    fn next_stream_id_increments_sequence() {
656        assert_eq!(next_stream_id("1234-0"), "1234-1");
657        assert_eq!(next_stream_id("1234-99"), "1234-100");
658    }
659
660    #[test]
661    fn next_stream_id_wraps_seq_overflow() {
662        let id = format!("5-{}", u64::MAX);
663        assert_eq!(next_stream_id(&id), "6-0");
664    }
665
666    #[test]
667    fn next_stream_id_falls_back_on_malformed_id() {
668        // Not a real Redis ID — fallback path appends NUL.
669        let next = next_stream_id("not-a-real-id");
670        assert!(next.starts_with("not-a-real-id"));
671        assert!(next.ends_with('\u{0}'));
672    }
673
674    #[test]
675    fn stream_entry_to_json_extracts_id_and_fields() {
676        let mut map = std::collections::HashMap::new();
677        map.insert(
678            "field1".to_string(),
679            redis::Value::BulkString(b"value1".to_vec()),
680        );
681        map.insert("field2".to_string(), redis::Value::Int(42));
682        let json = stream_entry_to_json("100-0", &map);
683        assert_eq!(json["id"], "100-0");
684        assert_eq!(json["fields"]["field1"], "value1");
685        assert_eq!(json["fields"]["field2"], 42);
686    }
687}