Skip to main content

sierradb_server/request/
epscan.rs

1use combine::error::StreamError;
2use combine::{Parser, easy, many};
3use indexmap::indexmap;
4use redis_protocol::resp3::types::BytesFrame;
5use sierradb::bucket::segment::EventRecord;
6use sierradb_cluster::read::ReadPartition;
7use sierradb_protocol::ErrorCode;
8
9use crate::error::MapRedisError;
10use crate::parser::{FrameStream, keyword, number_u64, partition_selector, range_value};
11use crate::request::{
12    HandleRequest, PartitionSelector, RangeValue, array, encode_event, map, simple_str,
13};
14use crate::server::Conn;
15
16/// Scan events in a partition by sequence number range.
17///
18/// # Syntax
19/// ```text
20/// EPSCAN <partition> <start_sequence> <end_sequence> [COUNT <count>]
21/// ```
22///
23/// # Parameters
24/// - `partition`: Partition selector (partition ID 0-65535 or UUID key)
25/// - `start_sequence`: Starting sequence number (use "-" for beginning)
26/// - `end_sequence`: Ending sequence number (use "+" for end, or specific
27///   number)
28/// - `count` (optional): Maximum number of events to return
29///
30/// # Examples
31/// ```text
32/// EPSCAN 42 100 200 COUNT 50
33/// EPSCAN 550e8400-e29b-41d4-a716-446655440000 - + COUNT 100
34/// ```
35pub struct EPScan {
36    pub partition: PartitionSelector,
37    pub start_sequence: RangeValue,
38    pub end_sequence: RangeValue,
39    pub count: Option<u64>,
40}
41
42impl EPScan {
43    pub fn parser<'a>() -> impl Parser<FrameStream<'a>, Output = EPScan> + 'a {
44        (
45            partition_selector(),
46            range_value(),
47            range_value(),
48            many::<Vec<_>, _, _>(OptionalArg::parser()),
49        )
50            .and_then(|(partition, start_sequence, end_sequence, args)| {
51                let mut cmd = EPScan {
52                    partition,
53                    start_sequence,
54                    end_sequence,
55                    count: None,
56                };
57
58                for arg in args {
59                    match arg {
60                        OptionalArg::Count(count) => {
61                            if cmd.count.is_some() {
62                                return Err(easy::Error::message_format("count already specified"));
63                            }
64
65                            cmd.count = Some(count);
66                        }
67                    }
68                }
69
70                Ok(cmd)
71            })
72    }
73}
74
75#[derive(Debug, Clone, PartialEq)]
76enum OptionalArg {
77    Count(u64),
78}
79
80impl OptionalArg {
81    fn parser<'a>() -> impl Parser<FrameStream<'a>, Output = OptionalArg> + 'a {
82        keyword("COUNT").with(number_u64()).map(OptionalArg::Count)
83    }
84}
85
86impl HandleRequest for EPScan {
87    type Error = String;
88    type Ok = EPScanResp;
89
90    async fn handle_request(self, conn: &mut Conn) -> Result<Option<Self::Ok>, Self::Error> {
91        let start_sequence = match self.start_sequence {
92            RangeValue::Start => 0,
93            RangeValue::End => {
94                return Err(ErrorCode::InvalidArg.with_message("start sequence cannot be '+'"));
95            }
96            RangeValue::Value(n) => n,
97        };
98        let end_sequence = match self.end_sequence {
99            RangeValue::Start => {
100                return Err(ErrorCode::InvalidArg.with_message("end sequence cannot be '-'"));
101            }
102            RangeValue::End => None,
103            RangeValue::Value(n) => Some(n),
104        };
105
106        let records = conn
107            .cluster_ref
108            .ask(ReadPartition {
109                partition_id: self.partition.into_partition_id(conn.num_partitions),
110                start_sequence,
111                end_sequence,
112                count: self.count.unwrap_or(100),
113            })
114            .await
115            .map_redis_err()?;
116
117        Ok(Some(EPScanResp {
118            has_more: records.has_more,
119            events: records.events,
120        }))
121    }
122}
123
124pub struct EPScanResp {
125    has_more: bool,
126    events: Vec<EventRecord>,
127}
128
129impl From<EPScanResp> for BytesFrame {
130    fn from(resp: EPScanResp) -> Self {
131        map(indexmap! {
132            simple_str("has_more") => resp.has_more.into(),
133            simple_str("events") => array(resp.events.into_iter().map(encode_event).collect()),
134        })
135    }
136}