1use std::time::{SystemTime, UNIX_EPOCH};
2
3use combine::error::StreamError;
4use combine::{Parser, attempt, choice, easy, many, many1};
5use indexmap::indexmap;
6use redis_protocol::resp3::types::BytesFrame;
7use sierradb::StreamId;
8use sierradb::bucket::PartitionId;
9use sierradb::database::{NewEvent, Transaction};
10use sierradb::id::{uuid_to_partition_hash, uuid_v7_with_partition_hash};
11use sierradb_cluster::write::execute::ExecuteTransaction;
12use sierradb_protocol::{ErrorCode, ExpectedVersion};
13use smallvec::SmallVec;
14use uuid::Uuid;
15
16use crate::error::MapRedisError;
17use crate::parser::{
18 FrameStream, data, event_id, expected_version, keyword, number_u64, partition_key, stream_id,
19 string,
20};
21use crate::request::{HandleRequest, array, map, number, simple_str};
22use crate::server::Conn;
23
24#[derive(Clone, Debug, Default, PartialEq, Eq)]
52pub struct EMAppend {
53 pub partition_key: Uuid,
54 pub events: Vec<Event>,
55}
56
57#[derive(Clone, Debug, Default, PartialEq, Eq)]
58pub struct Event {
59 pub stream_id: StreamId,
60 pub event_name: String,
61 pub event_id: Option<Uuid>,
62 pub expected_version: ExpectedVersion,
63 pub timestamp: Option<u64>,
64 pub payload: Vec<u8>,
65 pub metadata: Vec<u8>,
66}
67
68impl Event {
69 fn parser<'a>() -> impl Parser<FrameStream<'a>, Output = Event> + 'a {
70 (
71 stream_id(),
72 string().expected("event name"),
73 many::<Vec<_>, _, _>(OptionalArg::parser()),
74 )
75 .and_then(|(stream_id, event_name, args)| {
76 let mut cmd = Event {
77 stream_id,
78 event_name: event_name.to_string(),
79 ..Default::default()
80 };
81
82 for arg in args {
83 match arg {
84 OptionalArg::EventId(event_id) => {
85 if cmd.event_id.is_some() {
86 return Err(easy::Error::message_format(
87 "event id already specified",
88 ));
89 }
90
91 cmd.event_id = Some(event_id);
92 }
93 OptionalArg::ExpectedVersion(expected_version) => {
94 if !matches!(cmd.expected_version, ExpectedVersion::Any) {
95 return Err(easy::Error::message_format(
96 "expected version already specified",
97 ));
98 }
99
100 cmd.expected_version = expected_version;
101 }
102 OptionalArg::Timestamp(timestamp) => {
103 if cmd.timestamp.is_some() {
104 return Err(easy::Error::message_format(
105 "timestamp already specified",
106 ));
107 }
108
109 cmd.timestamp = Some(timestamp);
110 }
111 OptionalArg::Payload(payload) => {
112 if !cmd.payload.is_empty() {
113 return Err(easy::Error::message_format(
114 "payload already specified",
115 ));
116 }
117
118 cmd.payload = payload.to_vec();
119 }
120 OptionalArg::Metadata(metadata) => {
121 if !cmd.metadata.is_empty() {
122 return Err(easy::Error::message_format(
123 "metadata already specified",
124 ));
125 }
126
127 cmd.metadata = metadata.to_vec();
128 }
129 }
130 }
131
132 Ok(cmd)
133 })
134 }
135}
136
137#[derive(Debug, Clone, PartialEq)]
138enum OptionalArg<'a> {
139 EventId(Uuid),
140 ExpectedVersion(ExpectedVersion),
141 Timestamp(u64),
142 Payload(&'a [u8]),
143 Metadata(&'a [u8]),
144}
145
146impl<'a> OptionalArg<'a> {
147 fn parser() -> impl Parser<FrameStream<'a>, Output = OptionalArg<'a>> + 'a {
148 let event_id = keyword("EVENT_ID")
149 .with(event_id())
150 .map(OptionalArg::EventId);
151 let expected_version = keyword("EXPECTED_VERSION")
152 .with(expected_version())
153 .map(OptionalArg::ExpectedVersion);
154 let timestamp = keyword("TIMESTAMP")
155 .with(number_u64())
156 .map(OptionalArg::Timestamp);
157 let payload = keyword("PAYLOAD").with(data()).map(OptionalArg::Payload);
158 let metadata = keyword("METADATA").with(data()).map(OptionalArg::Metadata);
159
160 choice!(
161 attempt(event_id),
162 attempt(expected_version),
163 attempt(timestamp),
164 attempt(payload),
165 attempt(metadata)
166 )
167 }
168}
169
170impl EMAppend {
171 pub fn parser<'a>() -> impl Parser<FrameStream<'a>, Output = EMAppend> + 'a {
172 (partition_key(), many1::<Vec<_>, _, _>(Event::parser())).map(|(partition_key, events)| {
173 EMAppend {
174 partition_key,
175 events,
176 }
177 })
178 }
179}
180
181impl HandleRequest for EMAppend {
182 type Error = String;
183 type Ok = EMAppendResp;
184
185 async fn handle_request(self, conn: &mut Conn) -> Result<Option<Self::Ok>, Self::Error> {
186 let partition_hash = uuid_to_partition_hash(self.partition_key);
187 let partition_id = partition_hash % conn.num_partitions;
188 let now = SystemTime::now()
189 .duration_since(UNIX_EPOCH)
190 .map_redis_err()?
191 .as_nanos() as u64;
192
193 let events: SmallVec<[_; 4]> = self.events
194 .into_iter()
195 .map(|event| {
196 let event_id = event
197 .event_id
198 .unwrap_or_else(|| uuid_v7_with_partition_hash(partition_hash));
199 let timestamp = event
200 .timestamp
201 .map(|timestamp| {
202 timestamp.checked_mul(1_000_000).ok_or(
203 ErrorCode::InvalidArg
204 .with_message(
205 "invalid timestamp format: expected milliseconds, got nanoseconds",
206 )
207 .to_string(),
208 )
209 })
210 .transpose()?
211 .unwrap_or(now);
212 Ok(NewEvent {
213 event_id,
214 stream_id: event.stream_id,
215 stream_version: event.expected_version,
216 event_name: event.event_name,
217 timestamp,
218 metadata: event.metadata,
219 payload: event.payload,
220 })
221 })
222 .collect::<Result<_, String>>()?;
223 let event_ids_timestamps: SmallVec<[_; 4]> = events
224 .iter()
225 .map(|event| (event.event_id, event.timestamp))
226 .collect();
227
228 let transaction = Transaction::new(self.partition_key, partition_id, events)
229 .map_err(|err| ErrorCode::InvalidArg.with_message(err))?;
230
231 let result = conn
232 .cluster_ref
233 .ask(ExecuteTransaction::new(transaction))
234 .await
235 .map_redis_err()?;
236
237 let stream_versions = result.stream_versions.into_iter();
238 let events = event_ids_timestamps
239 .into_iter()
240 .zip(stream_versions)
241 .map(
242 |((event_id, timestamp), (stream_id, stream_version))| EventInfo {
243 event_id,
244 stream_id,
245 stream_version,
246 timestamp,
247 },
248 )
249 .collect();
250
251 Ok(Some(EMAppendResp {
252 partition_key: self.partition_key,
253 partition_id,
254 first_partition_sequence: result.first_partition_sequence,
255 last_partition_sequence: result.last_partition_sequence,
256 events,
257 }))
258 }
259}
260
261pub struct EMAppendResp {
262 partition_key: Uuid,
263 partition_id: PartitionId,
264 first_partition_sequence: u64,
265 last_partition_sequence: u64,
266 events: Vec<EventInfo>,
267}
268
269struct EventInfo {
270 event_id: Uuid,
271 stream_id: StreamId,
272 stream_version: u64,
273 timestamp: u64,
274}
275
276impl From<EMAppendResp> for BytesFrame {
277 fn from(resp: EMAppendResp) -> Self {
278 map(indexmap! {
279 simple_str("partition_key") => simple_str(resp.partition_key.to_string()),
280 simple_str("partition_id") => number(resp.partition_id as i64),
281 simple_str("first_partition_sequence") => number(resp.first_partition_sequence as i64),
282 simple_str("last_partition_sequence") => number(resp.last_partition_sequence as i64),
283 simple_str("events") => array(resp.events.into_iter().map(BytesFrame::from).collect()),
284 })
285 }
286}
287
288impl From<EventInfo> for BytesFrame {
289 fn from(info: EventInfo) -> Self {
290 map(indexmap! {
291 simple_str("event_id") => simple_str(info.event_id.to_string()),
292 simple_str("stream_id") => simple_str(info.stream_id.to_string()),
293 simple_str("stream_version") => number(info.stream_version as i64),
294 simple_str("timestamp") => number((info.timestamp / 1_000_000) as i64),
295 })
296 }
297}