sierradb_server/request/
epscan.rs1use combine::error::StreamError;
2use combine::{Parser, easy, many};
3use indexmap::indexmap;
4use redis_protocol::resp3::types::BytesFrame;
5use sierradb::bucket::segment::EventRecord;
6use sierradb_cluster::read::ReadPartition;
7use sierradb_protocol::ErrorCode;
8
9use crate::error::MapRedisError;
10use crate::parser::{FrameStream, keyword, number_u64, partition_selector, range_value};
11use crate::request::{
12 HandleRequest, PartitionSelector, RangeValue, array, encode_event, map, simple_str,
13};
14use crate::server::Conn;
15
16pub struct EPScan {
36 pub partition: PartitionSelector,
37 pub start_sequence: RangeValue,
38 pub end_sequence: RangeValue,
39 pub count: Option<u64>,
40}
41
42impl EPScan {
43 pub fn parser<'a>() -> impl Parser<FrameStream<'a>, Output = EPScan> + 'a {
44 (
45 partition_selector(),
46 range_value(),
47 range_value(),
48 many::<Vec<_>, _, _>(OptionalArg::parser()),
49 )
50 .and_then(|(partition, start_sequence, end_sequence, args)| {
51 let mut cmd = EPScan {
52 partition,
53 start_sequence,
54 end_sequence,
55 count: None,
56 };
57
58 for arg in args {
59 match arg {
60 OptionalArg::Count(count) => {
61 if cmd.count.is_some() {
62 return Err(easy::Error::message_format("count already specified"));
63 }
64
65 cmd.count = Some(count);
66 }
67 }
68 }
69
70 Ok(cmd)
71 })
72 }
73}
74
75#[derive(Debug, Clone, PartialEq)]
76enum OptionalArg {
77 Count(u64),
78}
79
80impl OptionalArg {
81 fn parser<'a>() -> impl Parser<FrameStream<'a>, Output = OptionalArg> + 'a {
82 keyword("COUNT").with(number_u64()).map(OptionalArg::Count)
83 }
84}
85
86impl HandleRequest for EPScan {
87 type Error = String;
88 type Ok = EPScanResp;
89
90 async fn handle_request(self, conn: &mut Conn) -> Result<Option<Self::Ok>, Self::Error> {
91 let start_sequence = match self.start_sequence {
92 RangeValue::Start => 0,
93 RangeValue::End => {
94 return Err(ErrorCode::InvalidArg.with_message("start sequence cannot be '+'"));
95 }
96 RangeValue::Value(n) => n,
97 };
98 let end_sequence = match self.end_sequence {
99 RangeValue::Start => {
100 return Err(ErrorCode::InvalidArg.with_message("end sequence cannot be '-'"));
101 }
102 RangeValue::End => None,
103 RangeValue::Value(n) => Some(n),
104 };
105
106 let records = conn
107 .cluster_ref
108 .ask(ReadPartition {
109 partition_id: self.partition.into_partition_id(conn.num_partitions),
110 start_sequence,
111 end_sequence,
112 count: self.count.unwrap_or(100),
113 })
114 .await
115 .map_redis_err()?;
116
117 Ok(Some(EPScanResp {
118 has_more: records.has_more,
119 events: records.events,
120 }))
121 }
122}
123
124pub struct EPScanResp {
125 has_more: bool,
126 events: Vec<EventRecord>,
127}
128
129impl From<EPScanResp> for BytesFrame {
130 fn from(resp: EPScanResp) -> Self {
131 map(indexmap! {
132 simple_str("has_more") => resp.has_more.into(),
133 simple_str("events") => array(resp.events.into_iter().map(encode_event).collect()),
134 })
135 }
136}