sierradb_server/request/
escan.rs1use combine::error::StreamError;
2use combine::{Parser, attempt, choice, easy, many};
3use indexmap::indexmap;
4use redis_protocol::resp3::types::BytesFrame;
5use sierradb::StreamId;
6use sierradb::bucket::segment::EventRecord;
7use sierradb::id::{NAMESPACE_PARTITION_KEY, uuid_to_partition_hash};
8use sierradb_cluster::read::ReadStream;
9use sierradb_protocol::ErrorCode;
10use uuid::Uuid;
11
12use crate::error::MapRedisError;
13use crate::parser::{FrameStream, keyword, number_u64, partition_key, range_value, stream_id};
14use crate::request::{HandleRequest, RangeValue, array, encode_event, map, simple_str};
15use crate::server::Conn;
16
17pub struct EScan {
37 pub stream_id: StreamId,
38 pub start_version: RangeValue,
39 pub end_version: RangeValue,
40 pub partition_key: Option<Uuid>,
41 pub count: Option<u64>,
42}
43
44impl EScan {
45 pub fn parser<'a>() -> impl Parser<FrameStream<'a>, Output = EScan> + 'a {
46 (
47 stream_id(),
48 range_value(),
49 range_value(),
50 many::<Vec<_>, _, _>(OptionalArg::parser()),
51 )
52 .and_then(|(stream_id, start_version, end_version, args)| {
53 let mut cmd = EScan {
54 stream_id,
55 start_version,
56 end_version,
57 partition_key: None,
58 count: None,
59 };
60
61 for arg in args {
62 match arg {
63 OptionalArg::PartitionKey(partition_key) => {
64 if cmd.partition_key.is_some() {
65 return Err(easy::Error::message_format(
66 "partition key already specified",
67 ));
68 }
69
70 cmd.partition_key = Some(partition_key);
71 }
72 OptionalArg::Count(count) => {
73 if cmd.count.is_some() {
74 return Err(easy::Error::message_format("count already specified"));
75 }
76
77 cmd.count = Some(count);
78 }
79 }
80 }
81
82 Ok(cmd)
83 })
84 }
85}
86
87#[derive(Debug, Clone, PartialEq)]
88enum OptionalArg {
89 PartitionKey(Uuid),
90 Count(u64),
91}
92
93impl OptionalArg {
94 fn parser<'a>() -> impl Parser<FrameStream<'a>, Output = OptionalArg> + 'a {
95 let partition_key = keyword("PARTITION_KEY")
96 .with(partition_key())
97 .map(OptionalArg::PartitionKey);
98 let count = keyword("COUNT").with(number_u64()).map(OptionalArg::Count);
99
100 choice!(attempt(partition_key), attempt(count))
101 }
102}
103
104impl HandleRequest for EScan {
105 type Error = String;
106 type Ok = EScanResp;
107
108 async fn handle_request(self, conn: &mut Conn) -> Result<Option<Self::Ok>, Self::Error> {
109 let partition_key = self
110 .partition_key
111 .unwrap_or_else(|| Uuid::new_v5(&NAMESPACE_PARTITION_KEY, self.stream_id.as_bytes()));
112 let partition_hash = uuid_to_partition_hash(partition_key);
113 let partition_id = partition_hash % conn.num_partitions;
114
115 let start_version = match self.start_version {
116 RangeValue::Start => 0,
117 RangeValue::End => {
118 return Err(ErrorCode::InvalidArg.with_message("start version cannot be '+'"));
119 }
120 RangeValue::Value(n) => n,
121 };
122
123 let end_version = match self.end_version {
124 RangeValue::Start => {
125 return Err(ErrorCode::InvalidArg.with_message("end version cannot be '-'"));
126 }
127 RangeValue::End => None,
128 RangeValue::Value(n) => Some(n),
129 };
130
131 let records = conn
132 .cluster_ref
133 .ask(ReadStream {
134 stream_id: self.stream_id,
135 partition_id,
136 start_version,
137 end_version,
138 count: self.count.unwrap_or(100),
139 })
140 .await
141 .map_redis_err()?;
142
143 Ok(Some(EScanResp {
144 has_more: records.has_more,
145 events: records.events,
146 }))
147 }
148}
149
150pub struct EScanResp {
151 has_more: bool,
152 events: Vec<EventRecord>,
153}
154
155impl From<EScanResp> for BytesFrame {
156 fn from(resp: EScanResp) -> Self {
157 map(indexmap! {
158 simple_str("has_more") => resp.has_more.into(),
159 simple_str("events") => array(resp.events.into_iter().map(encode_event).collect()),
160 })
161 }
162}