Skip to main content

sierradb_server/request/
escan.rs

1use combine::error::StreamError;
2use combine::{Parser, attempt, choice, easy, many};
3use indexmap::indexmap;
4use redis_protocol::resp3::types::BytesFrame;
5use sierradb::StreamId;
6use sierradb::bucket::segment::EventRecord;
7use sierradb::id::{NAMESPACE_PARTITION_KEY, uuid_to_partition_hash};
8use sierradb_cluster::read::ReadStream;
9use sierradb_protocol::ErrorCode;
10use uuid::Uuid;
11
12use crate::error::MapRedisError;
13use crate::parser::{FrameStream, keyword, number_u64, partition_key, range_value, stream_id};
14use crate::request::{HandleRequest, RangeValue, array, encode_event, map, simple_str};
15use crate::server::Conn;
16
17/// Scan events in a stream by version range.
18///
19/// # Syntax
20/// ```text
21/// ESCAN <stream_id> <start_version> <end_version> [PARTITION_KEY <partition_key>] [COUNT <count>]
22/// ```
23///
24/// # Parameters
25/// - `stream_id`: Stream identifier to scan
26/// - `start_version`: Starting version number (use "-" for beginning)
27/// - `end_version`: Ending version number (use "+" for end, or specific number)
28/// - `partition_key` (optional): UUID to scan specific partition
29/// - `count` (optional): Maximum number of events to return
30///
31/// # Examples
32/// ```text
33/// ESCAN my-stream 0 100 COUNT 50
34/// ESCAN my-stream - + PARTITION_KEY 550e8400-e29b-41d4-a716-446655440000
35/// ```
36pub struct EScan {
37    pub stream_id: StreamId,
38    pub start_version: RangeValue,
39    pub end_version: RangeValue,
40    pub partition_key: Option<Uuid>,
41    pub count: Option<u64>,
42}
43
44impl EScan {
45    pub fn parser<'a>() -> impl Parser<FrameStream<'a>, Output = EScan> + 'a {
46        (
47            stream_id(),
48            range_value(),
49            range_value(),
50            many::<Vec<_>, _, _>(OptionalArg::parser()),
51        )
52            .and_then(|(stream_id, start_version, end_version, args)| {
53                let mut cmd = EScan {
54                    stream_id,
55                    start_version,
56                    end_version,
57                    partition_key: None,
58                    count: None,
59                };
60
61                for arg in args {
62                    match arg {
63                        OptionalArg::PartitionKey(partition_key) => {
64                            if cmd.partition_key.is_some() {
65                                return Err(easy::Error::message_format(
66                                    "partition key already specified",
67                                ));
68                            }
69
70                            cmd.partition_key = Some(partition_key);
71                        }
72                        OptionalArg::Count(count) => {
73                            if cmd.count.is_some() {
74                                return Err(easy::Error::message_format("count already specified"));
75                            }
76
77                            cmd.count = Some(count);
78                        }
79                    }
80                }
81
82                Ok(cmd)
83            })
84    }
85}
86
87#[derive(Debug, Clone, PartialEq)]
88enum OptionalArg {
89    PartitionKey(Uuid),
90    Count(u64),
91}
92
93impl OptionalArg {
94    fn parser<'a>() -> impl Parser<FrameStream<'a>, Output = OptionalArg> + 'a {
95        let partition_key = keyword("PARTITION_KEY")
96            .with(partition_key())
97            .map(OptionalArg::PartitionKey);
98        let count = keyword("COUNT").with(number_u64()).map(OptionalArg::Count);
99
100        choice!(attempt(partition_key), attempt(count))
101    }
102}
103
104impl HandleRequest for EScan {
105    type Error = String;
106    type Ok = EScanResp;
107
108    async fn handle_request(self, conn: &mut Conn) -> Result<Option<Self::Ok>, Self::Error> {
109        let partition_key = self
110            .partition_key
111            .unwrap_or_else(|| Uuid::new_v5(&NAMESPACE_PARTITION_KEY, self.stream_id.as_bytes()));
112        let partition_hash = uuid_to_partition_hash(partition_key);
113        let partition_id = partition_hash % conn.num_partitions;
114
115        let start_version = match self.start_version {
116            RangeValue::Start => 0,
117            RangeValue::End => {
118                return Err(ErrorCode::InvalidArg.with_message("start version cannot be '+'"));
119            }
120            RangeValue::Value(n) => n,
121        };
122
123        let end_version = match self.end_version {
124            RangeValue::Start => {
125                return Err(ErrorCode::InvalidArg.with_message("end version cannot be '-'"));
126            }
127            RangeValue::End => None,
128            RangeValue::Value(n) => Some(n),
129        };
130
131        let records = conn
132            .cluster_ref
133            .ask(ReadStream {
134                stream_id: self.stream_id,
135                partition_id,
136                start_version,
137                end_version,
138                count: self.count.unwrap_or(100),
139            })
140            .await
141            .map_redis_err()?;
142
143        Ok(Some(EScanResp {
144            has_more: records.has_more,
145            events: records.events,
146        }))
147    }
148}
149
150pub struct EScanResp {
151    has_more: bool,
152    events: Vec<EventRecord>,
153}
154
155impl From<EScanResp> for BytesFrame {
156    fn from(resp: EScanResp) -> Self {
157        map(indexmap! {
158            simple_str("has_more") => resp.has_more.into(),
159            simple_str("events") => array(resp.events.into_iter().map(encode_event).collect()),
160        })
161    }
162}