1pub mod eack;
2pub mod eappend;
3pub mod eget;
4pub mod emappend;
5pub mod epscan;
6pub mod epseq;
7pub mod epsub;
8pub mod escan;
9pub mod esub;
10pub mod esver;
11pub mod hello;
12pub mod ping;
13
14use std::collections::HashMap;
15use std::num::{ParseIntError, TryFromIntError};
16
17use bytes::Bytes;
18use combine::{Parser, eof};
19use redis_protocol::resp3::types::{BytesFrame, VerbatimStringFormat};
20use sierradb::StreamId;
21use sierradb::bucket::PartitionId;
22use sierradb::bucket::segment::EventRecord;
23use sierradb::database::ExpectedVersion;
24use sierradb::id::uuid_to_partition_hash;
25use sierradb_cluster::subscription::{FromSequences, SubscriptionMatcher};
26use sierradb_protocol::ErrorCode;
27use tokio::io;
28use tracing::debug;
29use uuid::Uuid;
30
31use crate::parser::frame_stream;
32use crate::request::eack::EAck;
33use crate::request::eappend::EAppend;
34use crate::request::eget::EGet;
35use crate::request::emappend::EMAppend;
36use crate::request::epscan::EPScan;
37use crate::request::epseq::EPSeq;
38use crate::request::epsub::EPSub;
39use crate::request::escan::EScan;
40use crate::request::esub::ESub;
41use crate::request::esver::ESVer;
42use crate::request::hello::Hello;
43use crate::request::ping::Ping;
44use crate::server::Conn;
45
46pub enum Command {
47 EAck,
48 EAppend,
49 EGet,
50 EMAppend,
51 EPScan,
52 EPSeq,
53 EPSub,
54 ESVer,
55 EScan,
56 ESub,
57 Hello,
58 Ping,
59}
60
61impl Command {
62 pub async fn handle(
63 &self,
64 args: &[BytesFrame],
65 conn: &mut Conn,
66 ) -> Result<Option<BytesFrame>, io::Error> {
67 macro_rules! handle_commands {
68 ( $( $name:ident ),* $(,)? ) => {
69 match self {
70 $( Command::$name => {
71 let stream = frame_stream(args);
72 match $name::parser().skip(eof()).parse(stream) {
73 Ok((cmd, _)) => cmd.handle_request_failable(conn).await,
74 Err(err) => {
75 Ok(Some(BytesFrame::SimpleError {
76 data: err.to_string().into(),
77 attributes: None,
78 }))
79 }
80 }
81 } )*
82 }
83 };
84 }
85
86 handle_commands![
87 EAck, EAppend, EGet, EMAppend, EPScan, EPSeq, EPSub, EScan, ESVer, ESub, Hello, Ping
88 ]
89 }
90}
91
92impl TryFrom<&BytesFrame> for Command {
93 type Error = String;
94
95 fn try_from(frame: &BytesFrame) -> Result<Self, Self::Error> {
96 match frame {
97 BytesFrame::BlobString { data, .. }
98 | BytesFrame::SimpleString { data, .. }
99 | BytesFrame::BigNumber { data, .. }
100 | BytesFrame::VerbatimString {
101 data,
102 format: VerbatimStringFormat::Text,
103 ..
104 } => {
105 match str::from_utf8(data)
106 .map_err(|_| "invalid command".to_string())?
107 .to_ascii_uppercase()
108 .as_str()
109 {
110 "EACK" => Ok(Command::EAck),
111 "EAPPEND" => Ok(Command::EAppend),
112 "EGET" => Ok(Command::EGet),
113 "EMAPPEND" => Ok(Command::EMAppend),
114 "EPSCAN" => Ok(Command::EPScan),
115 "EPSEQ" => Ok(Command::EPSeq),
116 "EPSUB" => Ok(Command::EPSub),
117 "ESVER" => Ok(Command::ESVer),
118 "ESCAN" => Ok(Command::EScan),
119 "ESUB" => Ok(Command::ESub),
120 "HELLO" => Ok(Command::Hello),
121 "PING" => Ok(Command::Ping),
122 cmd => {
123 debug!("received unknown command {cmd}");
124 Err(ErrorCode::InvalidArg.with_message(format!("unknown command '{cmd}'")))
125 }
126 }
127 }
128 _ => Err(ErrorCode::InvalidArg.with_message("invalid type for command name")),
129 }
130 }
131}
132
133pub trait HandleRequest: Sized + Send {
134 type Ok: Into<BytesFrame>;
135 type Error: ToString;
136
137 fn handle_request(
138 self,
139 conn: &mut Conn,
140 ) -> impl Future<Output = Result<Option<Self::Ok>, Self::Error>> + Send;
141
142 fn handle_request_failable(
143 self,
144 conn: &mut Conn,
145 ) -> impl Future<Output = Result<Option<BytesFrame>, io::Error>> + Send {
146 async move {
147 match self.handle_request(conn).await {
148 Ok(Some(resp)) => Ok(Some(resp.into())),
149 Ok(None) => Ok(None),
150 Err(err) => Ok(Some(BytesFrame::SimpleError {
151 data: err.to_string().into(),
152 attributes: None,
153 })),
154 }
155 }
156 }
157}
158
159pub trait FromArgs: Sized {
160 fn from_args(args: &[BytesFrame]) -> Result<Self, String>;
161}
162
163impl FromArgs for SubscriptionMatcher {
164 fn from_args(args: &[BytesFrame]) -> Result<Self, String> {
165 let mut i = 0;
166 let kind = <&str>::from_bytes_frame(
167 args.get(i)
168 .ok_or_else(|| ErrorCode::InvalidArg.with_message("missing subscription type"))?,
169 )
170 .map_err(|err| {
171 ErrorCode::InvalidArg.with_message(format!("invalid subscription type: {err}"))
172 })?;
173 i += 1;
174 match kind {
175 "ALL_PARTITIONS" | "all_partitions" => {
176 let from_sequences_kind =
177 <&str>::from_bytes_frame(args.get(i).ok_or_else(|| {
178 ErrorCode::InvalidArg.with_message("missing start filter")
179 })?)
180 .map_err(|err| {
181 ErrorCode::InvalidArg.with_message(format!("invalid start filter: {err}"))
182 })?;
183 i += 1;
184 match from_sequences_kind {
185 "LATEST" | "latest" => Ok(SubscriptionMatcher::AllPartitions {
186 from_sequences: FromSequences::Latest,
187 }),
188 "ALL" | "all" => {
189 let from_sequence =
190 u64::from_bytes_frame(args.get(i).ok_or_else(|| {
191 ErrorCode::InvalidArg.with_message("missing start sequence")
192 })?)
193 .map_err(|err| {
194 ErrorCode::InvalidArg
195 .with_message(format!("invalid start sequence: {err}"))
196 })?;
197
198 Ok(SubscriptionMatcher::AllPartitions {
199 from_sequences: FromSequences::AllPartitions(from_sequence),
200 })
201 }
202 "PARTITIONS" | "partitions" => {
203 let mut from_sequences = HashMap::new();
204
205 loop {
206 let Some(arg) = args.get(i) else {
207 break;
208 };
209
210 i += 1;
211
212 match PartitionId::from_bytes_frame(arg) {
213 Ok(partition_id) => {
214 let from_sequence =
215 u64::from_bytes_frame(args.get(i).ok_or_else(|| {
216 ErrorCode::InvalidArg
217 .with_message("missing from sequence")
218 })?)
219 .map_err(
220 |err| {
221 ErrorCode::InvalidArg.with_message(format!(
222 "invalid from sequence: {err}"
223 ))
224 },
225 )?;
226
227 i += 1;
228
229 from_sequences.insert(partition_id, from_sequence);
230 }
231 Err(err) => {
232 let Ok(fallback_keyword) = <&str>::from_bytes_frame(arg) else {
233 return Err(err);
234 };
235 if fallback_keyword != "FALLBACK"
236 && fallback_keyword != "fallback"
237 {
238 return Err(err);
239 }
240
241 let fallback =
242 u64::from_bytes_frame(args.get(i).ok_or_else(|| {
243 ErrorCode::InvalidArg
244 .with_message("missing fallback sequence")
245 })?)
246 .map_err(
247 |err| {
248 ErrorCode::InvalidArg.with_message(format!(
249 "invalid fallback sequence: {err}"
250 ))
251 },
252 )?;
253
254 return Ok(SubscriptionMatcher::AllPartitions {
255 from_sequences: FromSequences::Partitions {
256 from_sequences,
257 fallback: Some(fallback),
258 },
259 });
260 }
261 }
262 }
263
264 Ok(SubscriptionMatcher::AllPartitions {
265 from_sequences: FromSequences::Partitions {
266 from_sequences,
267 fallback: None,
268 },
269 })
270 }
271 _ => Err(ErrorCode::InvalidArg
272 .with_message(format!("unknown start filter '{from_sequences_kind}'"))),
273 }
274 }
275 "PARTITIONS" | "partitions" => {
276 todo!()
277 }
278 "STREAMS" | "streams" => {
279 todo!()
280 }
281 _ => Err(ErrorCode::InvalidArg.with_message("unknown subscription type '{kind}'")),
282 }
283 }
284}
285
286pub trait FromBytesFrame<'a>: Sized {
287 fn from_bytes_frame(frame: &'a BytesFrame) -> Result<Self, String>;
288}
289
290impl<'a, T> FromBytesFrame<'a> for Option<T>
291where
292 T: FromBytesFrame<'a>,
293{
294 fn from_bytes_frame(frame: &'a BytesFrame) -> Result<Self, String> {
295 match frame {
296 BytesFrame::Null => Ok(None),
297 _ => Ok(Some(T::from_bytes_frame(frame)?)),
298 }
299 }
300}
301
302impl FromBytesFrame<'_> for u32 {
303 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
304 match frame {
305 BytesFrame::BlobString { data, .. }
306 | BytesFrame::SimpleString { data, .. }
307 | BytesFrame::BigNumber { data, .. }
308 | BytesFrame::VerbatimString {
309 data,
310 format: VerbatimStringFormat::Text,
311 ..
312 } => {
313 let s = std::str::from_utf8(data).map_err(|_| "invalid string".to_string())?;
314 s.parse::<u32>().map_err(|_| "invalid u32".to_string())
315 }
316 BytesFrame::Number { data, .. } => {
317 if *data < 0 {
318 Err("negative number for u32".to_string())
319 } else if *data > u32::MAX as i64 {
320 Err("number too large for u32".to_string())
321 } else {
322 Ok(*data as u32)
323 }
324 }
325 _ => Err("invalid type for u32".to_string()),
326 }
327 }
328}
329
330impl FromBytesFrame<'_> for i64 {
331 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
332 match frame {
333 BytesFrame::BlobString { data, .. }
334 | BytesFrame::SimpleString { data, .. }
335 | BytesFrame::BigNumber { data, .. }
336 | BytesFrame::VerbatimString {
337 data,
338 format: VerbatimStringFormat::Text,
339 ..
340 } => str::from_utf8(data)
341 .map_err(|err| err.to_string())?
342 .parse()
343 .map_err(|err: ParseIntError| err.to_string()),
344 BytesFrame::Number { data, .. } => Ok(*data),
345 _ => Err("unsupported type, expecting i64".to_string()),
346 }
347 }
348}
349
350impl FromBytesFrame<'_> for u64 {
351 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
352 match frame {
353 BytesFrame::BlobString { data, .. }
354 | BytesFrame::SimpleString { data, .. }
355 | BytesFrame::BigNumber { data, .. }
356 | BytesFrame::VerbatimString {
357 data,
358 format: VerbatimStringFormat::Text,
359 ..
360 } => str::from_utf8(data)
361 .map_err(|err| err.to_string())?
362 .parse()
363 .map_err(|err: ParseIntError| err.to_string()),
364 BytesFrame::Number { data, .. } => (*data)
365 .try_into()
366 .map_err(|err: TryFromIntError| err.to_string()),
367 _ => Err("unsupported type, expecting i64".to_string()),
368 }
369 }
370}
371
372impl FromBytesFrame<'_> for u16 {
373 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
374 match frame {
375 BytesFrame::BlobString { data, .. }
376 | BytesFrame::SimpleString { data, .. }
377 | BytesFrame::BigNumber { data, .. }
378 | BytesFrame::VerbatimString {
379 data,
380 format: VerbatimStringFormat::Text,
381 ..
382 } => str::from_utf8(data)
383 .map_err(|err| err.to_string())?
384 .parse()
385 .map_err(|err: ParseIntError| err.to_string()),
386 BytesFrame::Number { data, .. } => (*data)
387 .try_into()
388 .map_err(|err: TryFromIntError| err.to_string()),
389 _ => Err("unsupported type, expecting i64".to_string()),
390 }
391 }
392}
393
394impl<'a> FromBytesFrame<'a> for &'a str {
395 fn from_bytes_frame(frame: &'a BytesFrame) -> Result<Self, String> {
396 match frame {
397 BytesFrame::BlobString { data, .. }
398 | BytesFrame::SimpleString { data, .. }
399 | BytesFrame::BigNumber { data, .. }
400 | BytesFrame::VerbatimString {
401 data,
402 format: VerbatimStringFormat::Text,
403 ..
404 } => str::from_utf8(data).map_err(|err| err.to_string()),
405 _ => Err("unsupported type, expecting string".to_string()),
406 }
407 }
408}
409
410impl FromBytesFrame<'_> for String {
411 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
412 <&str>::from_bytes_frame(frame).map(ToOwned::to_owned)
413 }
414}
415
416impl<'a> FromBytesFrame<'a> for &'a [u8] {
417 fn from_bytes_frame(frame: &'a BytesFrame) -> Result<Self, String> {
418 match frame {
419 BytesFrame::BlobString { data, .. }
420 | BytesFrame::SimpleString { data, .. }
421 | BytesFrame::BigNumber { data, .. }
422 | BytesFrame::VerbatimString {
423 data,
424 format: VerbatimStringFormat::Text,
425 ..
426 } => Ok(data),
427 _ => Err("unsupported type, expecting bytes".to_string()),
428 }
429 }
430}
431
432impl FromBytesFrame<'_> for Vec<u8> {
433 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
434 <&[u8]>::from_bytes_frame(frame).map(ToOwned::to_owned)
435 }
436}
437
438impl FromBytesFrame<'_> for StreamId {
439 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
440 StreamId::new(<String>::from_bytes_frame(frame)?).map_err(|err| err.to_string())
441 }
442}
443
444impl FromBytesFrame<'_> for Uuid {
445 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
446 <&str>::from_bytes_frame(frame)?
447 .parse()
448 .map_err(|err: uuid::Error| err.to_string())
449 }
450}
451
452impl FromBytesFrame<'_> for ExpectedVersion {
453 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
454 <u64>::from_bytes_frame(frame)
455 .map(ExpectedVersion::Exact)
456 .or_else(|_| {
457 <&str>::from_bytes_frame(frame).and_then(|s| match s {
458 "any" | "ANY" => Ok(ExpectedVersion::Any),
459 "exists" | "EXISTS" => Ok(ExpectedVersion::Exists),
460 "empty" | "EMPTY" => Ok(ExpectedVersion::Empty),
461 _ => Err("unknown expected version value".to_string()),
462 })
463 })
464 }
465}
466
467#[derive(Debug, Clone, PartialEq)]
468pub enum RangeValue {
469 Start, End, Value(u64), }
473
474impl FromBytesFrame<'_> for RangeValue {
475 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
476 <&str>::from_bytes_frame(frame)
477 .and_then(|s| match s {
478 "-" => Ok(RangeValue::Start),
479 "+" => Ok(RangeValue::End),
480 _ => Err(String::default()),
481 })
482 .or_else(|_| <u64>::from_bytes_frame(frame).map(RangeValue::Value))
483 .map_err(|_| "unknown range value, expected '-', '+', or number".to_string())
484 }
485}
486
487#[derive(Clone, Copy, Debug, PartialEq)]
488pub enum PartitionSelector {
489 ById(PartitionId), ByKey(Uuid), }
492
493impl PartitionSelector {
494 pub fn into_partition_id(self, num_partitions: u16) -> PartitionId {
495 match self {
496 PartitionSelector::ById(id) => id,
497 PartitionSelector::ByKey(key) => uuid_to_partition_hash(key) % num_partitions,
498 }
499 }
500}
501
502impl FromBytesFrame<'_> for PartitionSelector {
503 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
504 <Uuid>::from_bytes_frame(frame)
505 .map(PartitionSelector::ByKey)
506 .or_else(|_| <PartitionId>::from_bytes_frame(frame).map(PartitionSelector::ById))
507 }
508}
509
510#[derive(Debug, Clone, PartialEq)]
512pub enum PartitionRange {
513 Single(PartitionSelector),
515 Range(u16, u16),
517 List(Vec<PartitionSelector>),
519 All,
521}
522
523impl PartitionRange {
524 pub fn expand(&self, num_partitions: u16) -> Vec<PartitionId> {
526 match self {
527 PartitionRange::Single(selector) => {
528 vec![selector.into_partition_id(num_partitions)]
529 }
530 PartitionRange::Range(start, end) => {
531 let start = (*start).min(num_partitions.saturating_sub(1));
532 let end = (*end).min(num_partitions.saturating_sub(1));
533 if start <= end {
534 (start..=end).collect()
535 } else {
536 vec![]
537 }
538 }
539 PartitionRange::List(selectors) => selectors
540 .iter()
541 .map(|s| s.into_partition_id(num_partitions))
542 .collect(),
543 PartitionRange::All => (0..num_partitions).collect(),
544 }
545 }
546}
547
548#[derive(Debug, Clone)]
550pub enum FromSequenceSpec {
551 Single(u64),
553 PerPartition(HashMap<u16, u64>),
555}
556
557impl FromSequenceSpec {
558 pub fn get_sequence_for_partition(&self, partition_id: u16) -> Option<u64> {
560 match self {
561 FromSequenceSpec::Single(seq) => Some(*seq),
562 FromSequenceSpec::PerPartition(map) => map.get(&partition_id).copied(),
563 }
564 }
565}
566
567impl FromBytesFrame<'_> for PartitionRange {
568 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
569 if let Ok(s) = <&str>::from_bytes_frame(frame) {
571 if s == "*" {
573 return Ok(PartitionRange::All);
574 }
575
576 if let Some(dash_pos) = s.find('-') {
578 let start_str = &s[..dash_pos];
579 let end_str = &s[dash_pos + 1..];
580
581 let start: u16 = start_str
582 .parse()
583 .map_err(|_| format!("invalid start partition ID in range: '{start_str}'"))?;
584 let end: u16 = end_str
585 .parse()
586 .map_err(|_| format!("invalid end partition ID in range: '{end_str}'"))?;
587
588 return Ok(PartitionRange::Range(start, end));
589 }
590
591 if s.contains(',') {
593 let mut selectors = Vec::new();
594 for part in s.split(',') {
595 let part = part.trim();
596 let selector = if let Ok(uuid) = part.parse::<Uuid>() {
598 PartitionSelector::ByKey(uuid)
599 } else if let Ok(id) = part.parse::<u16>() {
600 PartitionSelector::ById(id)
601 } else {
602 return Err(format!("invalid partition selector in list: '{part}'"));
603 };
604 selectors.push(selector);
605 }
606 return Ok(PartitionRange::List(selectors));
607 }
608 }
609
610 PartitionSelector::from_bytes_frame(frame).map(PartitionRange::Single)
612 }
613}
614
615impl FromBytesFrame<'_> for FromSequenceSpec {
616 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
617 if let Ok(sequence) = u64::from_bytes_frame(frame) {
619 return Ok(FromSequenceSpec::Single(sequence));
620 }
621
622 if let Ok(s) = <&str>::from_bytes_frame(frame) {
624 if let Ok(sequence) = s.parse::<u64>() {
626 return Ok(FromSequenceSpec::Single(sequence));
627 }
628
629 if s.contains(':') {
631 let mut partition_sequences = HashMap::new();
632
633 for pair in s.split(',') {
634 let parts: Vec<&str> = pair.split(':').collect();
635 if parts.len() != 2 {
636 return Err(format!("invalid partition:sequence pair: '{pair}'"));
637 }
638
639 let partition_id: u16 = parts[0]
640 .parse()
641 .map_err(|_| format!("invalid partition ID: '{}'", parts[0]))?;
642 let sequence: u64 = parts[1]
643 .parse()
644 .map_err(|_| format!("invalid sequence number: '{}'", parts[1]))?;
645
646 partition_sequences.insert(partition_id, sequence);
647 }
648
649 if partition_sequences.is_empty() {
650 return Err("no valid partition:sequence pairs found".to_string());
651 }
652
653 return Ok(FromSequenceSpec::PerPartition(partition_sequences));
654 }
655 }
656
657 match frame {
658 BytesFrame::Map { data, .. } => {
660 let mut partition_sequences = HashMap::new();
661
662 for (key_frame, value_frame) in data {
663 let partition_id = match key_frame {
665 BytesFrame::SimpleString { data, .. }
666 | BytesFrame::BlobString { data, .. } => std::str::from_utf8(data)
667 .map_err(|_| "invalid UTF-8 in partition ID key")?
668 .parse::<u16>()
669 .map_err(|_| "invalid partition ID in map key")?,
670 BytesFrame::Number { data, .. } => {
671 if *data < 0 || *data > u16::MAX as i64 {
672 return Err("partition ID out of range".to_string());
673 }
674 *data as u16
675 }
676 _ => return Err("invalid type for partition ID key in map".to_string()),
677 };
678
679 let sequence = u64::from_bytes_frame(value_frame)
681 .map_err(|_| "invalid sequence value in map")?;
682
683 partition_sequences.insert(partition_id, sequence);
684 }
685
686 Ok(FromSequenceSpec::PerPartition(partition_sequences))
687 }
688
689 BytesFrame::Array { data, .. } => {
691 let mut partition_sequences = HashMap::new();
692
693 for item in data {
694 if let BytesFrame::Array { data: pair, .. } = item {
695 if pair.len() != 2 {
696 return Err(
697 "expected [partition_id, sequence] pairs in array".to_string()
698 );
699 }
700
701 let partition_id = match &pair[0] {
703 BytesFrame::SimpleString { data, .. }
704 | BytesFrame::BlobString { data, .. } => std::str::from_utf8(data)
705 .map_err(|_| "invalid UTF-8 in partition ID")?
706 .parse::<u16>()
707 .map_err(|_| "invalid partition ID in array pair")?,
708 BytesFrame::Number { data, .. } => {
709 if *data < 0 || *data > u16::MAX as i64 {
710 return Err(
711 "partition ID out of range in array pair".to_string()
712 );
713 }
714 *data as u16
715 }
716 _ => {
717 return Err(
718 "invalid type for partition ID in array pair".to_string()
719 );
720 }
721 };
722
723 let sequence = u64::from_bytes_frame(&pair[1])
725 .map_err(|_| "invalid sequence in array pair")?;
726
727 partition_sequences.insert(partition_id, sequence);
728 } else {
729 return Err(
730 "expected array pairs in FROM_SEQUENCE array format".to_string()
731 );
732 }
733 }
734
735 Ok(FromSequenceSpec::PerPartition(partition_sequences))
736 }
737
738 _ => Err("expected number, map, or array for FROM_SEQUENCE".to_string()),
739 }
740 }
741}
742
743#[inline(always)]
744pub fn simple_str(s: impl Into<Bytes>) -> BytesFrame {
745 BytesFrame::SimpleString {
746 data: s.into(),
747 attributes: None,
748 }
749}
750
751#[inline(always)]
752pub fn blob_str(s: impl Into<Bytes>) -> BytesFrame {
753 BytesFrame::BlobString {
754 data: s.into(),
755 attributes: None,
756 }
757}
758
759#[inline(always)]
760pub fn number(n: i64) -> BytesFrame {
761 BytesFrame::Number {
762 data: n,
763 attributes: None,
764 }
765}
766
767#[inline(always)]
768pub fn map(items: HashMap<BytesFrame, BytesFrame>) -> BytesFrame {
769 BytesFrame::Map {
770 data: items,
771 attributes: None,
772 }
773}
774
775#[inline(always)]
776pub fn array(items: Vec<BytesFrame>) -> BytesFrame {
777 BytesFrame::Array {
778 data: items,
779 attributes: None,
780 }
781}
782
783#[inline(always)]
784pub fn encode_event(record: EventRecord) -> BytesFrame {
785 map(HashMap::from_iter([
786 (
787 simple_str("event_id"),
788 simple_str(record.event_id.to_string()),
789 ),
790 (
791 simple_str("partition_key"),
792 simple_str(record.partition_key.to_string()),
793 ),
794 (
795 simple_str("partition_id"),
796 number(record.partition_id as i64),
797 ),
798 (
799 simple_str("transaction_id"),
800 simple_str(record.transaction_id.to_string()),
801 ),
802 (
803 simple_str("partition_sequence"),
804 number(record.partition_sequence as i64),
805 ),
806 (
807 simple_str("stream_version"),
808 number(record.stream_version as i64),
809 ),
810 (
811 simple_str("timestamp"),
812 number((record.timestamp / 1_000_000) as i64),
813 ),
814 (
815 simple_str("stream_id"),
816 blob_str(record.stream_id.to_string()),
817 ),
818 (simple_str("event_name"), blob_str(record.event_name)),
819 (simple_str("metadata"), blob_str(record.metadata)),
820 (simple_str("payload"), blob_str(record.payload)),
821 ]))
822}