Skip to main content

sierradb_server/request/
eappend.rs

1use std::time::{SystemTime, UNIX_EPOCH};
2
3use combine::error::StreamError;
4use combine::{Parser, attempt, choice, easy, many};
5use indexmap::indexmap;
6use redis_protocol::resp3::types::BytesFrame;
7use sierradb::StreamId;
8use sierradb::bucket::PartitionId;
9use sierradb::database::{NewEvent, Transaction};
10use sierradb::id::{NAMESPACE_PARTITION_KEY, uuid_to_partition_hash, uuid_v7_with_partition_hash};
11use sierradb_cluster::write::execute::ExecuteTransaction;
12use sierradb_protocol::{ErrorCode, ExpectedVersion};
13use smallvec::smallvec;
14use uuid::Uuid;
15
16use crate::error::MapRedisError;
17use crate::parser::{
18    FrameStream, data, event_id, expected_version, keyword, number_u64, partition_key, stream_id,
19    string,
20};
21use crate::request::{HandleRequest, map, number, simple_str};
22use crate::server::Conn;
23
24/// Append an event to a stream.
25///
26/// # Syntax
27/// ```text
28/// EAPPEND <stream_id> <event_name> [EVENT_ID <event_id>] [PARTITION_KEY <partition_key>] [EXPECTED_VERSION <version>] [TIMESTAMP <timestamp>] [PAYLOAD <payload>] [METADATA <metadata>]
29/// ```
30///
31/// # Parameters
32/// - `stream_id`: Stream identifier to append the event to
33/// - `event_name`: Name/type of the event
34/// - `event_id` (optional): UUID for the event (auto-generated if not provided)
35/// - `partition_key` (optional): UUID to determine event partitioning
36/// - `expected_version` (optional): Expected stream version (number, "any",
37///   "exists", "empty")
38/// - `payload` (optional): Event payload data
39/// - `metadata` (optional): Event metadata
40///
41/// # Example
42/// ```text
43/// EAPPEND my-stream UserCreated PAYLOAD '{"name":"john"}' METADATA '{"source":"api"}'
44/// ```
45#[derive(Clone, Debug, Default)]
46pub struct EAppend {
47    pub stream_id: StreamId,
48    pub event_name: String,
49    pub event_id: Option<Uuid>,
50    pub partition_key: Option<Uuid>,
51    pub expected_version: ExpectedVersion,
52    pub timestamp: Option<u64>,
53    pub payload: Vec<u8>,
54    pub metadata: Vec<u8>,
55}
56
57#[derive(Debug, Clone, PartialEq)]
58enum OptionalArg<'a> {
59    EventId(Uuid),
60    PartitionKey(Uuid),
61    ExpectedVersion(ExpectedVersion),
62    Timestamp(u64),
63    Payload(&'a [u8]),
64    Metadata(&'a [u8]),
65}
66
67impl<'a> OptionalArg<'a> {
68    fn parser() -> impl Parser<FrameStream<'a>, Output = OptionalArg<'a>> + 'a {
69        let event_id = keyword("EVENT_ID")
70            .with(event_id())
71            .map(OptionalArg::EventId);
72        let partition_key = keyword("PARTITION_KEY")
73            .with(partition_key())
74            .map(OptionalArg::PartitionKey);
75        let expected_version = keyword("EXPECTED_VERSION")
76            .with(expected_version())
77            .map(OptionalArg::ExpectedVersion);
78        let timestamp = keyword("TIMESTAMP")
79            .with(number_u64())
80            .map(OptionalArg::Timestamp);
81        let payload = keyword("PAYLOAD").with(data()).map(OptionalArg::Payload);
82        let metadata = keyword("METADATA").with(data()).map(OptionalArg::Metadata);
83
84        choice!(
85            attempt(event_id),
86            attempt(partition_key),
87            attempt(expected_version),
88            attempt(timestamp),
89            attempt(payload),
90            attempt(metadata)
91        )
92    }
93}
94
95impl EAppend {
96    pub fn parser<'a>() -> impl Parser<FrameStream<'a>, Output = EAppend> + 'a {
97        (
98            stream_id(),
99            string().expected("event name"),
100            many::<Vec<_>, _, _>(OptionalArg::parser()),
101        )
102            .and_then(|(stream_id, event_name, args)| {
103                let mut cmd = EAppend {
104                    stream_id,
105                    event_name: event_name.to_string(),
106                    ..Default::default()
107                };
108
109                for arg in args {
110                    match arg {
111                        OptionalArg::EventId(event_id) => {
112                            if cmd.event_id.is_some() {
113                                return Err(easy::Error::message_format(
114                                    "event id already specified",
115                                ));
116                            }
117
118                            cmd.event_id = Some(event_id);
119                        }
120                        OptionalArg::PartitionKey(partition_key) => {
121                            if cmd.partition_key.is_some() {
122                                return Err(easy::Error::message_format(
123                                    "partition key already specified",
124                                ));
125                            }
126
127                            cmd.partition_key = Some(partition_key);
128                        }
129                        OptionalArg::ExpectedVersion(expected_version) => {
130                            if !matches!(cmd.expected_version, ExpectedVersion::Any) {
131                                return Err(easy::Error::message_format(
132                                    "expected version already specified",
133                                ));
134                            }
135
136                            cmd.expected_version = expected_version;
137                        }
138                        OptionalArg::Timestamp(timestamp) => {
139                            if cmd.timestamp.is_some() {
140                                return Err(easy::Error::message_format(
141                                    "timestamp already specified",
142                                ));
143                            }
144
145                            cmd.timestamp = Some(timestamp);
146                        }
147                        OptionalArg::Payload(payload) => {
148                            if !cmd.payload.is_empty() {
149                                return Err(easy::Error::message_format(
150                                    "payload already specified",
151                                ));
152                            }
153
154                            cmd.payload = payload.to_vec();
155                        }
156                        OptionalArg::Metadata(metadata) => {
157                            if !cmd.metadata.is_empty() {
158                                return Err(easy::Error::message_format(
159                                    "metadata already specified",
160                                ));
161                            }
162
163                            cmd.metadata = metadata.to_vec();
164                        }
165                    }
166                }
167
168                Ok(cmd)
169            })
170    }
171}
172
173impl HandleRequest for EAppend {
174    type Error = String;
175    type Ok = EAppendResp;
176
177    async fn handle_request(self, conn: &mut Conn) -> Result<Option<Self::Ok>, Self::Error> {
178        let partition_key = self
179            .partition_key
180            .unwrap_or_else(|| Uuid::new_v5(&NAMESPACE_PARTITION_KEY, self.stream_id.as_bytes()));
181        let partition_hash = uuid_to_partition_hash(partition_key);
182        let event_id = self
183            .event_id
184            .unwrap_or_else(|| uuid_v7_with_partition_hash(partition_hash));
185
186        let partition_id = partition_hash % conn.num_partitions;
187        let timestamp = self
188            .timestamp
189            .map(|timestamp| {
190                timestamp.checked_mul(1_000_000).ok_or(
191                    ErrorCode::InvalidArg
192                        .with_message(
193                            "invalid timestamp format: expected milliseconds, got nanoseconds",
194                        )
195                        .to_string(),
196                )
197            })
198            .unwrap_or_else(|| {
199                Ok(SystemTime::now()
200                    .duration_since(UNIX_EPOCH)
201                    .map_redis_err()?
202                    .as_nanos() as u64)
203            })?;
204
205        let transaction = match Transaction::new(
206            partition_key,
207            partition_id,
208            smallvec![NewEvent {
209                event_id,
210                stream_id: self.stream_id,
211                stream_version: self.expected_version,
212                event_name: self.event_name,
213                timestamp,
214                metadata: self.metadata,
215                payload: self.payload,
216            }],
217        ) {
218            Ok(transaction) => transaction,
219            Err(err) => return Err(err.to_string()),
220        };
221
222        let append = conn
223            .cluster_ref
224            .ask(ExecuteTransaction::new(transaction))
225            .await
226            .map_redis_err()?;
227
228        let mut stream_versions = append.stream_versions.into_iter();
229        let (_, stream_version) = stream_versions.next().unwrap();
230        debug_assert_eq!(stream_versions.next(), None);
231        debug_assert_eq!(
232            append.first_partition_sequence,
233            append.last_partition_sequence
234        );
235
236        Ok(Some(EAppendResp {
237            event_id,
238            partition_key,
239            partition_id,
240            partition_sequence: append.first_partition_sequence,
241            stream_version,
242            timestamp,
243        }))
244    }
245}
246
247pub struct EAppendResp {
248    event_id: Uuid,
249    partition_key: Uuid,
250    partition_id: PartitionId,
251    partition_sequence: u64,
252    stream_version: u64,
253    timestamp: u64,
254}
255
256impl From<EAppendResp> for BytesFrame {
257    fn from(resp: EAppendResp) -> Self {
258        map(indexmap! {
259            simple_str("event_id") => simple_str(resp.event_id.to_string()),
260            simple_str("partition_key") => simple_str(resp.partition_key.to_string()),
261            simple_str("partition_id") => number(resp.partition_id as i64),
262            simple_str("partition_sequence") => number(resp.partition_sequence as i64),
263            simple_str("stream_version") => number(resp.stream_version as i64),
264            simple_str("timestamp") => number((resp.timestamp / 1_000_000) as i64),
265        })
266    }
267}