Skip to main content

sierradb_server/request/
emappend.rs

1use std::time::{SystemTime, UNIX_EPOCH};
2
3use combine::error::StreamError;
4use combine::{Parser, attempt, choice, easy, many, many1};
5use indexmap::indexmap;
6use redis_protocol::resp3::types::BytesFrame;
7use sierradb::StreamId;
8use sierradb::bucket::PartitionId;
9use sierradb::database::{NewEvent, Transaction};
10use sierradb::id::{uuid_to_partition_hash, uuid_v7_with_partition_hash};
11use sierradb_cluster::write::execute::ExecuteTransaction;
12use sierradb_protocol::{ErrorCode, ExpectedVersion};
13use smallvec::SmallVec;
14use uuid::Uuid;
15
16use crate::error::MapRedisError;
17use crate::parser::{
18    FrameStream, data, event_id, expected_version, keyword, number_u64, partition_key, stream_id,
19    string,
20};
21use crate::request::{HandleRequest, array, map, number, simple_str};
22use crate::server::Conn;
23
24/// Append multiple events to streams in a single transaction.
25///
26/// # Syntax
27/// ```text
28/// EMAPPEND <partition_key> <stream_id1> <event_name1> [EVENT_ID <event_id1>] [EXPECTED_VERSION <version1>] [TIMESTAMP <timestamp>] [PAYLOAD <payload1>] [METADATA <metadata1>] [<stream_id2> <event_name2> ...]
29/// ```
30///
31/// # Parameters
32/// - `partition_key`: UUID that determines which partition all events will be
33///   written to
34/// - For each event:
35///   - `stream_id`: Stream identifier to append the event to
36///   - `event_name`: Name/type of the event
37///   - `event_id` (optional): UUID for the event (auto-generated if not
38///     provided)
39///   - `expected_version` (optional): Expected stream version (number, "any",
40///     "exists", "empty")
41///   - `timestamp` (optional): Event timestamp in milliseconds
42///   - `payload` (optional): Event payload data
43///   - `metadata` (optional): Event metadata
44///
45/// # Example
46/// ```text
47/// EMAPPEND 550e8400-e29b-41d4-a716-446655440000 stream1 EventA PAYLOAD '{"data":"value1"}' stream2 EventB PAYLOAD '{"data":"value2"}'
48/// ```
49///
50/// **Note:** All events are appended atomically in a single transaction.
51#[derive(Clone, Debug, Default, PartialEq, Eq)]
52pub struct EMAppend {
53    pub partition_key: Uuid,
54    pub events: Vec<Event>,
55}
56
57#[derive(Clone, Debug, Default, PartialEq, Eq)]
58pub struct Event {
59    pub stream_id: StreamId,
60    pub event_name: String,
61    pub event_id: Option<Uuid>,
62    pub expected_version: ExpectedVersion,
63    pub timestamp: Option<u64>,
64    pub payload: Vec<u8>,
65    pub metadata: Vec<u8>,
66}
67
68impl Event {
69    fn parser<'a>() -> impl Parser<FrameStream<'a>, Output = Event> + 'a {
70        (
71            stream_id(),
72            string().expected("event name"),
73            many::<Vec<_>, _, _>(OptionalArg::parser()),
74        )
75            .and_then(|(stream_id, event_name, args)| {
76                let mut cmd = Event {
77                    stream_id,
78                    event_name: event_name.to_string(),
79                    ..Default::default()
80                };
81
82                for arg in args {
83                    match arg {
84                        OptionalArg::EventId(event_id) => {
85                            if cmd.event_id.is_some() {
86                                return Err(easy::Error::message_format(
87                                    "event id already specified",
88                                ));
89                            }
90
91                            cmd.event_id = Some(event_id);
92                        }
93                        OptionalArg::ExpectedVersion(expected_version) => {
94                            if !matches!(cmd.expected_version, ExpectedVersion::Any) {
95                                return Err(easy::Error::message_format(
96                                    "expected version already specified",
97                                ));
98                            }
99
100                            cmd.expected_version = expected_version;
101                        }
102                        OptionalArg::Timestamp(timestamp) => {
103                            if cmd.timestamp.is_some() {
104                                return Err(easy::Error::message_format(
105                                    "timestamp already specified",
106                                ));
107                            }
108
109                            cmd.timestamp = Some(timestamp);
110                        }
111                        OptionalArg::Payload(payload) => {
112                            if !cmd.payload.is_empty() {
113                                return Err(easy::Error::message_format(
114                                    "payload already specified",
115                                ));
116                            }
117
118                            cmd.payload = payload.to_vec();
119                        }
120                        OptionalArg::Metadata(metadata) => {
121                            if !cmd.metadata.is_empty() {
122                                return Err(easy::Error::message_format(
123                                    "metadata already specified",
124                                ));
125                            }
126
127                            cmd.metadata = metadata.to_vec();
128                        }
129                    }
130                }
131
132                Ok(cmd)
133            })
134    }
135}
136
137#[derive(Debug, Clone, PartialEq)]
138enum OptionalArg<'a> {
139    EventId(Uuid),
140    ExpectedVersion(ExpectedVersion),
141    Timestamp(u64),
142    Payload(&'a [u8]),
143    Metadata(&'a [u8]),
144}
145
146impl<'a> OptionalArg<'a> {
147    fn parser() -> impl Parser<FrameStream<'a>, Output = OptionalArg<'a>> + 'a {
148        let event_id = keyword("EVENT_ID")
149            .with(event_id())
150            .map(OptionalArg::EventId);
151        let expected_version = keyword("EXPECTED_VERSION")
152            .with(expected_version())
153            .map(OptionalArg::ExpectedVersion);
154        let timestamp = keyword("TIMESTAMP")
155            .with(number_u64())
156            .map(OptionalArg::Timestamp);
157        let payload = keyword("PAYLOAD").with(data()).map(OptionalArg::Payload);
158        let metadata = keyword("METADATA").with(data()).map(OptionalArg::Metadata);
159
160        choice!(
161            attempt(event_id),
162            attempt(expected_version),
163            attempt(timestamp),
164            attempt(payload),
165            attempt(metadata)
166        )
167    }
168}
169
170impl EMAppend {
171    pub fn parser<'a>() -> impl Parser<FrameStream<'a>, Output = EMAppend> + 'a {
172        (partition_key(), many1::<Vec<_>, _, _>(Event::parser())).map(|(partition_key, events)| {
173            EMAppend {
174                partition_key,
175                events,
176            }
177        })
178    }
179}
180
181impl HandleRequest for EMAppend {
182    type Error = String;
183    type Ok = EMAppendResp;
184
185    async fn handle_request(self, conn: &mut Conn) -> Result<Option<Self::Ok>, Self::Error> {
186        let partition_hash = uuid_to_partition_hash(self.partition_key);
187        let partition_id = partition_hash % conn.num_partitions;
188        let now = SystemTime::now()
189            .duration_since(UNIX_EPOCH)
190            .map_redis_err()?
191            .as_nanos() as u64;
192
193        let events: SmallVec<[_; 4]> = self.events
194            .into_iter()
195            .map(|event| {
196                let event_id = event
197                    .event_id
198                    .unwrap_or_else(|| uuid_v7_with_partition_hash(partition_hash));
199                let timestamp = event
200                    .timestamp
201                    .map(|timestamp| {
202                        timestamp.checked_mul(1_000_000).ok_or(
203                            ErrorCode::InvalidArg
204                                .with_message(
205                                    "invalid timestamp format: expected milliseconds, got nanoseconds",
206                                )
207                                .to_string(),
208                        )
209                    })
210                    .transpose()?
211                    .unwrap_or(now);
212                Ok(NewEvent {
213                    event_id,
214                    stream_id: event.stream_id,
215                    stream_version: event.expected_version,
216                    event_name: event.event_name,
217                    timestamp,
218                    metadata: event.metadata,
219                    payload: event.payload,
220                })
221            })
222            .collect::<Result<_, String>>()?;
223        let event_ids_timestamps: SmallVec<[_; 4]> = events
224            .iter()
225            .map(|event| (event.event_id, event.timestamp))
226            .collect();
227
228        let transaction = Transaction::new(self.partition_key, partition_id, events)
229            .map_err(|err| ErrorCode::InvalidArg.with_message(err))?;
230
231        let result = conn
232            .cluster_ref
233            .ask(ExecuteTransaction::new(transaction))
234            .await
235            .map_redis_err()?;
236
237        let stream_versions = result.stream_versions.into_iter();
238        let events = event_ids_timestamps
239            .into_iter()
240            .zip(stream_versions)
241            .map(
242                |((event_id, timestamp), (stream_id, stream_version))| EventInfo {
243                    event_id,
244                    stream_id,
245                    stream_version,
246                    timestamp,
247                },
248            )
249            .collect();
250
251        Ok(Some(EMAppendResp {
252            partition_key: self.partition_key,
253            partition_id,
254            first_partition_sequence: result.first_partition_sequence,
255            last_partition_sequence: result.last_partition_sequence,
256            events,
257        }))
258    }
259}
260
261pub struct EMAppendResp {
262    partition_key: Uuid,
263    partition_id: PartitionId,
264    first_partition_sequence: u64,
265    last_partition_sequence: u64,
266    events: Vec<EventInfo>,
267}
268
269struct EventInfo {
270    event_id: Uuid,
271    stream_id: StreamId,
272    stream_version: u64,
273    timestamp: u64,
274}
275
276impl From<EMAppendResp> for BytesFrame {
277    fn from(resp: EMAppendResp) -> Self {
278        map(indexmap! {
279            simple_str("partition_key") => simple_str(resp.partition_key.to_string()),
280            simple_str("partition_id") => number(resp.partition_id as i64),
281            simple_str("first_partition_sequence") => number(resp.first_partition_sequence as i64),
282            simple_str("last_partition_sequence") => number(resp.last_partition_sequence as i64),
283            simple_str("events") => array(resp.events.into_iter().map(BytesFrame::from).collect()),
284        })
285    }
286}
287
288impl From<EventInfo> for BytesFrame {
289    fn from(info: EventInfo) -> Self {
290        map(indexmap! {
291            simple_str("event_id") => simple_str(info.event_id.to_string()),
292            simple_str("stream_id") => simple_str(info.stream_id.to_string()),
293            simple_str("stream_version") => number(info.stream_version as i64),
294            simple_str("timestamp") => number((info.timestamp / 1_000_000) as i64),
295        })
296    }
297}