1use std::time::{SystemTime, UNIX_EPOCH};
2
3use combine::error::StreamError;
4use combine::{Parser, attempt, choice, easy, many};
5use indexmap::indexmap;
6use redis_protocol::resp3::types::BytesFrame;
7use sierradb::StreamId;
8use sierradb::bucket::PartitionId;
9use sierradb::database::{NewEvent, Transaction};
10use sierradb::id::{NAMESPACE_PARTITION_KEY, uuid_to_partition_hash, uuid_v7_with_partition_hash};
11use sierradb_cluster::write::execute::ExecuteTransaction;
12use sierradb_protocol::{ErrorCode, ExpectedVersion};
13use smallvec::smallvec;
14use uuid::Uuid;
15
16use crate::error::MapRedisError;
17use crate::parser::{
18 FrameStream, data, event_id, expected_version, keyword, number_u64, partition_key, stream_id,
19 string,
20};
21use crate::request::{HandleRequest, map, number, simple_str};
22use crate::server::Conn;
23
24#[derive(Clone, Debug, Default)]
46pub struct EAppend {
47 pub stream_id: StreamId,
48 pub event_name: String,
49 pub event_id: Option<Uuid>,
50 pub partition_key: Option<Uuid>,
51 pub expected_version: ExpectedVersion,
52 pub timestamp: Option<u64>,
53 pub payload: Vec<u8>,
54 pub metadata: Vec<u8>,
55}
56
57#[derive(Debug, Clone, PartialEq)]
58enum OptionalArg<'a> {
59 EventId(Uuid),
60 PartitionKey(Uuid),
61 ExpectedVersion(ExpectedVersion),
62 Timestamp(u64),
63 Payload(&'a [u8]),
64 Metadata(&'a [u8]),
65}
66
67impl<'a> OptionalArg<'a> {
68 fn parser() -> impl Parser<FrameStream<'a>, Output = OptionalArg<'a>> + 'a {
69 let event_id = keyword("EVENT_ID")
70 .with(event_id())
71 .map(OptionalArg::EventId);
72 let partition_key = keyword("PARTITION_KEY")
73 .with(partition_key())
74 .map(OptionalArg::PartitionKey);
75 let expected_version = keyword("EXPECTED_VERSION")
76 .with(expected_version())
77 .map(OptionalArg::ExpectedVersion);
78 let timestamp = keyword("TIMESTAMP")
79 .with(number_u64())
80 .map(OptionalArg::Timestamp);
81 let payload = keyword("PAYLOAD").with(data()).map(OptionalArg::Payload);
82 let metadata = keyword("METADATA").with(data()).map(OptionalArg::Metadata);
83
84 choice!(
85 attempt(event_id),
86 attempt(partition_key),
87 attempt(expected_version),
88 attempt(timestamp),
89 attempt(payload),
90 attempt(metadata)
91 )
92 }
93}
94
95impl EAppend {
96 pub fn parser<'a>() -> impl Parser<FrameStream<'a>, Output = EAppend> + 'a {
97 (
98 stream_id(),
99 string().expected("event name"),
100 many::<Vec<_>, _, _>(OptionalArg::parser()),
101 )
102 .and_then(|(stream_id, event_name, args)| {
103 let mut cmd = EAppend {
104 stream_id,
105 event_name: event_name.to_string(),
106 ..Default::default()
107 };
108
109 for arg in args {
110 match arg {
111 OptionalArg::EventId(event_id) => {
112 if cmd.event_id.is_some() {
113 return Err(easy::Error::message_format(
114 "event id already specified",
115 ));
116 }
117
118 cmd.event_id = Some(event_id);
119 }
120 OptionalArg::PartitionKey(partition_key) => {
121 if cmd.partition_key.is_some() {
122 return Err(easy::Error::message_format(
123 "partition key already specified",
124 ));
125 }
126
127 cmd.partition_key = Some(partition_key);
128 }
129 OptionalArg::ExpectedVersion(expected_version) => {
130 if !matches!(cmd.expected_version, ExpectedVersion::Any) {
131 return Err(easy::Error::message_format(
132 "expected version already specified",
133 ));
134 }
135
136 cmd.expected_version = expected_version;
137 }
138 OptionalArg::Timestamp(timestamp) => {
139 if cmd.timestamp.is_some() {
140 return Err(easy::Error::message_format(
141 "timestamp already specified",
142 ));
143 }
144
145 cmd.timestamp = Some(timestamp);
146 }
147 OptionalArg::Payload(payload) => {
148 if !cmd.payload.is_empty() {
149 return Err(easy::Error::message_format(
150 "payload already specified",
151 ));
152 }
153
154 cmd.payload = payload.to_vec();
155 }
156 OptionalArg::Metadata(metadata) => {
157 if !cmd.metadata.is_empty() {
158 return Err(easy::Error::message_format(
159 "metadata already specified",
160 ));
161 }
162
163 cmd.metadata = metadata.to_vec();
164 }
165 }
166 }
167
168 Ok(cmd)
169 })
170 }
171}
172
173impl HandleRequest for EAppend {
174 type Error = String;
175 type Ok = EAppendResp;
176
177 async fn handle_request(self, conn: &mut Conn) -> Result<Option<Self::Ok>, Self::Error> {
178 let partition_key = self
179 .partition_key
180 .unwrap_or_else(|| Uuid::new_v5(&NAMESPACE_PARTITION_KEY, self.stream_id.as_bytes()));
181 let partition_hash = uuid_to_partition_hash(partition_key);
182 let event_id = self
183 .event_id
184 .unwrap_or_else(|| uuid_v7_with_partition_hash(partition_hash));
185
186 let partition_id = partition_hash % conn.num_partitions;
187 let timestamp = self
188 .timestamp
189 .map(|timestamp| {
190 timestamp.checked_mul(1_000_000).ok_or(
191 ErrorCode::InvalidArg
192 .with_message(
193 "invalid timestamp format: expected milliseconds, got nanoseconds",
194 )
195 .to_string(),
196 )
197 })
198 .unwrap_or_else(|| {
199 Ok(SystemTime::now()
200 .duration_since(UNIX_EPOCH)
201 .map_redis_err()?
202 .as_nanos() as u64)
203 })?;
204
205 let transaction = match Transaction::new(
206 partition_key,
207 partition_id,
208 smallvec![NewEvent {
209 event_id,
210 stream_id: self.stream_id,
211 stream_version: self.expected_version,
212 event_name: self.event_name,
213 timestamp,
214 metadata: self.metadata,
215 payload: self.payload,
216 }],
217 ) {
218 Ok(transaction) => transaction,
219 Err(err) => return Err(err.to_string()),
220 };
221
222 let append = conn
223 .cluster_ref
224 .ask(ExecuteTransaction::new(transaction))
225 .await
226 .map_redis_err()?;
227
228 let mut stream_versions = append.stream_versions.into_iter();
229 let (_, stream_version) = stream_versions.next().unwrap();
230 debug_assert_eq!(stream_versions.next(), None);
231 debug_assert_eq!(
232 append.first_partition_sequence,
233 append.last_partition_sequence
234 );
235
236 Ok(Some(EAppendResp {
237 event_id,
238 partition_key,
239 partition_id,
240 partition_sequence: append.first_partition_sequence,
241 stream_version,
242 timestamp,
243 }))
244 }
245}
246
247pub struct EAppendResp {
248 event_id: Uuid,
249 partition_key: Uuid,
250 partition_id: PartitionId,
251 partition_sequence: u64,
252 stream_version: u64,
253 timestamp: u64,
254}
255
256impl From<EAppendResp> for BytesFrame {
257 fn from(resp: EAppendResp) -> Self {
258 map(indexmap! {
259 simple_str("event_id") => simple_str(resp.event_id.to_string()),
260 simple_str("partition_key") => simple_str(resp.partition_key.to_string()),
261 simple_str("partition_id") => number(resp.partition_id as i64),
262 simple_str("partition_sequence") => number(resp.partition_sequence as i64),
263 simple_str("stream_version") => number(resp.stream_version as i64),
264 simple_str("timestamp") => number((resp.timestamp / 1_000_000) as i64),
265 })
266 }
267}