Skip to main content

kevy_store/stream/
load.rs

1//! Consumer-group exchange types + the `XSETID` scalar setter — the
2//! pieces persistence (snapshot v4, AOF rewrite, reshard's `load_value`
3//! redistribution) needs to carry group/PEL state across a dump/load
4//! boundary. Split from `stream/mod.rs` to stay under the 500-LOC cap.
5
6use std::collections::BTreeMap;
7
8use kevy_map::KevyMap;
9
10use super::group::{ConsumerGroup, ConsumerState, PelEntry};
11use super::{StreamData, StreamId};
12use crate::StoreError;
13use crate::value::SmallBytes;
14
15/// One PEL row in primitive form: `(ms, seq, consumer, delivery_time_ms,
16/// delivery_count)`. The persist crate serializes these verbatim.
17pub type LoadedPelEntry = (u64, u64, Vec<u8>, u64, u32);
18
19/// One consumer group decoded into primitive tuples — the dump/load wire
20/// form shared by snapshot v4, AOF-rewrite filtering, and reshard's
21/// in-memory redistribution.
22pub struct LoadedGroup {
23    /// Group name.
24    pub name: Vec<u8>,
25    /// `last_delivered_id` as `(ms, seq)`.
26    pub last_delivered: (u64, u64),
27    /// `(name, last_seen_ms)` per known consumer. `pel_count` is
28    /// recomputed from `pel` on import.
29    pub consumers: Vec<(Vec<u8>, u64)>,
30    /// Every PEL row, including tombstones (entries XDEL'd while
31    /// pending) — snapshot keeps those; AOF rewrite filters them.
32    pub pel: Vec<LoadedPelEntry>,
33}
34
35impl StreamData {
36    /// Does an entry with `id` currently exist? AOF rewrite uses this
37    /// to filter tombstone PEL rows (XCLAIM can't re-create those).
38    pub fn contains_entry(&self, id: StreamId) -> bool {
39        self.entries.contains_key(&id)
40    }
41
42    /// Dump every group into the primitive exchange form.
43    pub fn export_groups(&self) -> Vec<LoadedGroup> {
44        self.groups
45            .iter()
46            .map(|(name, g)| LoadedGroup {
47                name: name.to_vec(),
48                last_delivered: (g.last_delivered_id.ms, g.last_delivered_id.seq),
49                consumers: g
50                    .consumers
51                    .iter()
52                    .map(|(c, cs)| (c.to_vec(), cs.last_seen_ms))
53                    .collect(),
54                pel: g
55                    .pel
56                    .iter()
57                    .map(|(id, p)| {
58                        (id.ms, id.seq, p.consumer.to_vec(), p.delivery_time_ms, p.delivery_count)
59                    })
60                    .collect(),
61            })
62            .collect()
63    }
64
65    /// Rebuild the group map from the exchange form (loader-side twin of
66    /// [`Self::export_groups`]). Per-consumer `pel_count` is recomputed;
67    /// a PEL owner missing from the consumer roster (hand-built or
68    /// corrupt file) gets a roster slot rather than a panic.
69    pub fn import_groups(&mut self, groups: Vec<LoadedGroup>) {
70        for lg in groups {
71            let mut consumers: KevyMap<SmallBytes, Box<ConsumerState>> = KevyMap::default();
72            for (name, last_seen_ms) in lg.consumers {
73                let name = SmallBytes::from_vec(name);
74                consumers.insert(
75                    name.clone(),
76                    Box::new(ConsumerState { name, last_seen_ms, pel_count: 0 }),
77                );
78            }
79            let mut pel: BTreeMap<StreamId, PelEntry> = BTreeMap::new();
80            for (ms, seq, consumer, delivery_time_ms, delivery_count) in lg.pel {
81                let consumer = SmallBytes::from_vec(consumer);
82                if consumers.get(consumer.as_slice()).is_none() {
83                    consumers.insert(
84                        consumer.clone(),
85                        Box::new(ConsumerState {
86                            name: consumer.clone(),
87                            last_seen_ms: 0,
88                            pel_count: 0,
89                        }),
90                    );
91                }
92                if let Some(cs) = consumers.get_mut(consumer.as_slice()) {
93                    cs.pel_count += 1;
94                }
95                pel.insert(StreamId { ms, seq }, PelEntry {
96                    consumer,
97                    delivery_time_ms,
98                    delivery_count,
99                });
100            }
101            self.groups.insert(
102                SmallBytes::from_vec(lg.name),
103                Box::new(ConsumerGroup {
104                    last_delivered_id: StreamId {
105                        ms: lg.last_delivered.0,
106                        seq: lg.last_delivered.1,
107                    },
108                    pel,
109                    consumers,
110                }),
111            );
112        }
113    }
114
115    /// `XSETID key last-id [ENTRIESADDED n] [MAXDELETEDID id]` — overwrite
116    /// the stream's scalar state. Rejects a `last_id` below the current
117    /// top entry (Redis: "smaller than the target stream top item").
118    pub fn xsetid(
119        &mut self,
120        last_id: StreamId,
121        entries_added: Option<u64>,
122        max_deleted_id: Option<StreamId>,
123    ) -> Result<(), StoreError> {
124        if let Some((top, _)) = self.entries.iter().next_back()
125            && last_id < *top
126        {
127            return Err(StoreError::OutOfRange);
128        }
129        self.last_id = last_id;
130        if let Some(n) = entries_added {
131            self.entries_added = n;
132        }
133        if let Some(id) = max_deleted_id {
134            self.max_deleted_id = id;
135        }
136        Ok(())
137    }
138}