1pub mod eack;
2pub mod eappend;
3pub mod eget;
4pub mod emappend;
5pub mod epscan;
6pub mod epseq;
7pub mod epsub;
8pub mod escan;
9pub mod esub;
10pub mod esver;
11pub mod hello;
12pub mod info;
13pub mod ping;
14
15use std::collections::HashMap;
16use std::num::{ParseIntError, TryFromIntError};
17
18use bytes::Bytes;
19use combine::{Parser, eof};
20use indexmap::{IndexMap, indexmap};
21use redis_protocol::resp3::types::{BytesFrame, VerbatimStringFormat};
22use sierradb::StreamId;
23use sierradb::bucket::PartitionId;
24use sierradb::bucket::segment::EventRecord;
25use sierradb::database::ExpectedVersion;
26use sierradb::id::uuid_to_partition_hash;
27use sierradb_cluster::subscription::{FromSequences, SubscriptionMatcher};
28use sierradb_protocol::ErrorCode;
29use tokio::io;
30use tracing::debug;
31use uuid::Uuid;
32
33use crate::parser::frame_stream;
34use crate::request::eack::EAck;
35use crate::request::eappend::EAppend;
36use crate::request::eget::EGet;
37use crate::request::emappend::EMAppend;
38use crate::request::epscan::EPScan;
39use crate::request::epseq::EPSeq;
40use crate::request::epsub::EPSub;
41use crate::request::escan::EScan;
42use crate::request::esub::ESub;
43use crate::request::esver::ESVer;
44use crate::request::hello::Hello;
45use crate::request::info::Info;
46use crate::request::ping::Ping;
47use crate::server::Conn;
48
49pub enum Command {
50 EAck,
51 EAppend,
52 EGet,
53 EMAppend,
54 EPScan,
55 EPSeq,
56 EPSub,
57 ESVer,
58 EScan,
59 ESub,
60 Hello,
61 Info,
62 Ping,
63}
64
65impl Command {
66 pub async fn handle(
67 &self,
68 args: &[BytesFrame],
69 conn: &mut Conn,
70 ) -> Result<Option<BytesFrame>, io::Error> {
71 macro_rules! handle_commands {
72 ( $( $name:ident ),* $(,)? ) => {
73 match self {
74 $( Command::$name => {
75 let stream = frame_stream(args);
76 match $name::parser().skip(eof()).parse(stream) {
77 Ok((cmd, _)) => cmd.handle_request_failable(conn).await,
78 Err(err) => {
79 Ok(Some(BytesFrame::SimpleError {
80 data: err.to_string().into(),
81 attributes: None,
82 }))
83 }
84 }
85 } )*
86 }
87 };
88 }
89
90 handle_commands![
91 EAck, EAppend, EGet, EMAppend, EPScan, EPSeq, EPSub, EScan, ESVer, ESub, Hello, Info,
92 Ping
93 ]
94 }
95}
96
97impl TryFrom<&BytesFrame> for Command {
98 type Error = String;
99
100 fn try_from(frame: &BytesFrame) -> Result<Self, Self::Error> {
101 match frame {
102 BytesFrame::BlobString { data, .. }
103 | BytesFrame::SimpleString { data, .. }
104 | BytesFrame::BigNumber { data, .. }
105 | BytesFrame::VerbatimString {
106 data,
107 format: VerbatimStringFormat::Text,
108 ..
109 } => {
110 match str::from_utf8(data)
111 .map_err(|_| "invalid command".to_string())?
112 .to_ascii_uppercase()
113 .as_str()
114 {
115 "EACK" => Ok(Command::EAck),
116 "EAPPEND" => Ok(Command::EAppend),
117 "EGET" => Ok(Command::EGet),
118 "EMAPPEND" => Ok(Command::EMAppend),
119 "EPSCAN" => Ok(Command::EPScan),
120 "EPSEQ" => Ok(Command::EPSeq),
121 "EPSUB" => Ok(Command::EPSub),
122 "ESVER" => Ok(Command::ESVer),
123 "ESCAN" => Ok(Command::EScan),
124 "ESUB" => Ok(Command::ESub),
125 "HELLO" => Ok(Command::Hello),
126 "INFO" => Ok(Command::Info),
127 "PING" => Ok(Command::Ping),
128 cmd => {
129 debug!("received unknown command {cmd}");
130 Err(ErrorCode::InvalidArg.with_message(format!("unknown command '{cmd}'")))
131 }
132 }
133 }
134 _ => Err(ErrorCode::InvalidArg.with_message("invalid type for command name")),
135 }
136 }
137}
138
139pub trait HandleRequest: Sized + Send {
140 type Ok: Into<BytesFrame>;
141 type Error: ToString;
142
143 fn handle_request(
144 self,
145 conn: &mut Conn,
146 ) -> impl Future<Output = Result<Option<Self::Ok>, Self::Error>> + Send;
147
148 fn handle_request_failable(
149 self,
150 conn: &mut Conn,
151 ) -> impl Future<Output = Result<Option<BytesFrame>, io::Error>> + Send {
152 async move {
153 match self.handle_request(conn).await {
154 Ok(Some(resp)) => Ok(Some(resp.into())),
155 Ok(None) => Ok(None),
156 Err(err) => Ok(Some(BytesFrame::SimpleError {
157 data: err.to_string().into(),
158 attributes: None,
159 })),
160 }
161 }
162 }
163}
164
165pub trait FromArgs: Sized {
166 fn from_args(args: &[BytesFrame]) -> Result<Self, String>;
167}
168
169impl FromArgs for SubscriptionMatcher {
170 fn from_args(args: &[BytesFrame]) -> Result<Self, String> {
171 let mut i = 0;
172 let kind = <&str>::from_bytes_frame(
173 args.get(i)
174 .ok_or_else(|| ErrorCode::InvalidArg.with_message("missing subscription type"))?,
175 )
176 .map_err(|err| {
177 ErrorCode::InvalidArg.with_message(format!("invalid subscription type: {err}"))
178 })?;
179 i += 1;
180 match kind {
181 "ALL_PARTITIONS" | "all_partitions" => {
182 let from_sequences_kind =
183 <&str>::from_bytes_frame(args.get(i).ok_or_else(|| {
184 ErrorCode::InvalidArg.with_message("missing start filter")
185 })?)
186 .map_err(|err| {
187 ErrorCode::InvalidArg.with_message(format!("invalid start filter: {err}"))
188 })?;
189 i += 1;
190 match from_sequences_kind {
191 "LATEST" | "latest" => Ok(SubscriptionMatcher::AllPartitions {
192 from_sequences: FromSequences::Latest,
193 }),
194 "ALL" | "all" => {
195 let from_sequence =
196 u64::from_bytes_frame(args.get(i).ok_or_else(|| {
197 ErrorCode::InvalidArg.with_message("missing start sequence")
198 })?)
199 .map_err(|err| {
200 ErrorCode::InvalidArg
201 .with_message(format!("invalid start sequence: {err}"))
202 })?;
203
204 Ok(SubscriptionMatcher::AllPartitions {
205 from_sequences: FromSequences::AllPartitions(from_sequence),
206 })
207 }
208 "PARTITIONS" | "partitions" => {
209 let mut from_sequences = HashMap::new();
210
211 loop {
212 let Some(arg) = args.get(i) else {
213 break;
214 };
215
216 i += 1;
217
218 match PartitionId::from_bytes_frame(arg) {
219 Ok(partition_id) => {
220 let from_sequence =
221 u64::from_bytes_frame(args.get(i).ok_or_else(|| {
222 ErrorCode::InvalidArg
223 .with_message("missing from sequence")
224 })?)
225 .map_err(
226 |err| {
227 ErrorCode::InvalidArg.with_message(format!(
228 "invalid from sequence: {err}"
229 ))
230 },
231 )?;
232
233 i += 1;
234
235 from_sequences.insert(partition_id, from_sequence);
236 }
237 Err(err) => {
238 let Ok(fallback_keyword) = <&str>::from_bytes_frame(arg) else {
239 return Err(err);
240 };
241 if fallback_keyword != "FALLBACK"
242 && fallback_keyword != "fallback"
243 {
244 return Err(err);
245 }
246
247 let fallback =
248 u64::from_bytes_frame(args.get(i).ok_or_else(|| {
249 ErrorCode::InvalidArg
250 .with_message("missing fallback sequence")
251 })?)
252 .map_err(
253 |err| {
254 ErrorCode::InvalidArg.with_message(format!(
255 "invalid fallback sequence: {err}"
256 ))
257 },
258 )?;
259
260 return Ok(SubscriptionMatcher::AllPartitions {
261 from_sequences: FromSequences::Partitions {
262 from_sequences,
263 fallback: Some(fallback),
264 },
265 });
266 }
267 }
268 }
269
270 Ok(SubscriptionMatcher::AllPartitions {
271 from_sequences: FromSequences::Partitions {
272 from_sequences,
273 fallback: None,
274 },
275 })
276 }
277 _ => Err(ErrorCode::InvalidArg
278 .with_message(format!("unknown start filter '{from_sequences_kind}'"))),
279 }
280 }
281 "PARTITIONS" | "partitions" => {
282 todo!()
283 }
284 "STREAMS" | "streams" => {
285 todo!()
286 }
287 _ => Err(ErrorCode::InvalidArg.with_message("unknown subscription type '{kind}'")),
288 }
289 }
290}
291
292pub trait FromBytesFrame<'a>: Sized {
293 fn from_bytes_frame(frame: &'a BytesFrame) -> Result<Self, String>;
294}
295
296impl<'a, T> FromBytesFrame<'a> for Option<T>
297where
298 T: FromBytesFrame<'a>,
299{
300 fn from_bytes_frame(frame: &'a BytesFrame) -> Result<Self, String> {
301 match frame {
302 BytesFrame::Null => Ok(None),
303 _ => Ok(Some(T::from_bytes_frame(frame)?)),
304 }
305 }
306}
307
308impl FromBytesFrame<'_> for u32 {
309 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
310 match frame {
311 BytesFrame::BlobString { data, .. }
312 | BytesFrame::SimpleString { data, .. }
313 | BytesFrame::BigNumber { data, .. }
314 | BytesFrame::VerbatimString {
315 data,
316 format: VerbatimStringFormat::Text,
317 ..
318 } => {
319 let s = std::str::from_utf8(data).map_err(|_| "invalid string".to_string())?;
320 s.parse::<u32>().map_err(|_| "invalid u32".to_string())
321 }
322 BytesFrame::Number { data, .. } => {
323 if *data < 0 {
324 Err("negative number for u32".to_string())
325 } else if *data > u32::MAX as i64 {
326 Err("number too large for u32".to_string())
327 } else {
328 Ok(*data as u32)
329 }
330 }
331 _ => Err("invalid type for u32".to_string()),
332 }
333 }
334}
335
336impl FromBytesFrame<'_> for i64 {
337 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
338 match frame {
339 BytesFrame::BlobString { data, .. }
340 | BytesFrame::SimpleString { data, .. }
341 | BytesFrame::BigNumber { data, .. }
342 | BytesFrame::VerbatimString {
343 data,
344 format: VerbatimStringFormat::Text,
345 ..
346 } => str::from_utf8(data)
347 .map_err(|err| err.to_string())?
348 .parse()
349 .map_err(|err: ParseIntError| err.to_string()),
350 BytesFrame::Number { data, .. } => Ok(*data),
351 _ => Err("unsupported type, expecting i64".to_string()),
352 }
353 }
354}
355
356impl FromBytesFrame<'_> for u64 {
357 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
358 match frame {
359 BytesFrame::BlobString { data, .. }
360 | BytesFrame::SimpleString { data, .. }
361 | BytesFrame::BigNumber { data, .. }
362 | BytesFrame::VerbatimString {
363 data,
364 format: VerbatimStringFormat::Text,
365 ..
366 } => str::from_utf8(data)
367 .map_err(|err| err.to_string())?
368 .parse()
369 .map_err(|err: ParseIntError| err.to_string()),
370 BytesFrame::Number { data, .. } => (*data)
371 .try_into()
372 .map_err(|err: TryFromIntError| err.to_string()),
373 _ => Err("unsupported type, expecting i64".to_string()),
374 }
375 }
376}
377
378impl FromBytesFrame<'_> for u16 {
379 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
380 match frame {
381 BytesFrame::BlobString { data, .. }
382 | BytesFrame::SimpleString { data, .. }
383 | BytesFrame::BigNumber { data, .. }
384 | BytesFrame::VerbatimString {
385 data,
386 format: VerbatimStringFormat::Text,
387 ..
388 } => str::from_utf8(data)
389 .map_err(|err| err.to_string())?
390 .parse()
391 .map_err(|err: ParseIntError| err.to_string()),
392 BytesFrame::Number { data, .. } => (*data)
393 .try_into()
394 .map_err(|err: TryFromIntError| err.to_string()),
395 _ => Err("unsupported type, expecting i64".to_string()),
396 }
397 }
398}
399
400impl<'a> FromBytesFrame<'a> for &'a str {
401 fn from_bytes_frame(frame: &'a BytesFrame) -> Result<Self, String> {
402 match frame {
403 BytesFrame::BlobString { data, .. }
404 | BytesFrame::SimpleString { data, .. }
405 | BytesFrame::BigNumber { data, .. }
406 | BytesFrame::VerbatimString {
407 data,
408 format: VerbatimStringFormat::Text,
409 ..
410 } => str::from_utf8(data).map_err(|err| err.to_string()),
411 _ => Err("unsupported type, expecting string".to_string()),
412 }
413 }
414}
415
416impl FromBytesFrame<'_> for String {
417 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
418 <&str>::from_bytes_frame(frame).map(ToOwned::to_owned)
419 }
420}
421
422impl<'a> FromBytesFrame<'a> for &'a [u8] {
423 fn from_bytes_frame(frame: &'a BytesFrame) -> Result<Self, String> {
424 match frame {
425 BytesFrame::BlobString { data, .. }
426 | BytesFrame::SimpleString { data, .. }
427 | BytesFrame::BigNumber { data, .. }
428 | BytesFrame::VerbatimString {
429 data,
430 format: VerbatimStringFormat::Text,
431 ..
432 } => Ok(data),
433 _ => Err("unsupported type, expecting bytes".to_string()),
434 }
435 }
436}
437
438impl FromBytesFrame<'_> for Vec<u8> {
439 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
440 <&[u8]>::from_bytes_frame(frame).map(ToOwned::to_owned)
441 }
442}
443
444impl FromBytesFrame<'_> for StreamId {
445 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
446 StreamId::new(<String>::from_bytes_frame(frame)?).map_err(|err| err.to_string())
447 }
448}
449
450impl FromBytesFrame<'_> for Uuid {
451 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
452 <&str>::from_bytes_frame(frame)?
453 .parse()
454 .map_err(|err: uuid::Error| err.to_string())
455 }
456}
457
458impl FromBytesFrame<'_> for ExpectedVersion {
459 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
460 <u64>::from_bytes_frame(frame)
461 .map(ExpectedVersion::Exact)
462 .or_else(|_| {
463 <&str>::from_bytes_frame(frame).and_then(|s| match s {
464 "any" | "ANY" => Ok(ExpectedVersion::Any),
465 "exists" | "EXISTS" => Ok(ExpectedVersion::Exists),
466 "empty" | "EMPTY" => Ok(ExpectedVersion::Empty),
467 _ => Err("unknown expected version value".to_string()),
468 })
469 })
470 }
471}
472
473#[derive(Debug, Clone, PartialEq)]
474pub enum RangeValue {
475 Start, End, Value(u64), }
479
480impl FromBytesFrame<'_> for RangeValue {
481 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
482 <&str>::from_bytes_frame(frame)
483 .and_then(|s| match s {
484 "-" => Ok(RangeValue::Start),
485 "+" => Ok(RangeValue::End),
486 _ => Err(String::default()),
487 })
488 .or_else(|_| <u64>::from_bytes_frame(frame).map(RangeValue::Value))
489 .map_err(|_| "unknown range value, expected '-', '+', or number".to_string())
490 }
491}
492
493#[derive(Clone, Copy, Debug, PartialEq)]
494pub enum PartitionSelector {
495 ById(PartitionId), ByKey(Uuid), }
498
499impl PartitionSelector {
500 pub fn into_partition_id(self, num_partitions: u16) -> PartitionId {
501 match self {
502 PartitionSelector::ById(id) => id,
503 PartitionSelector::ByKey(key) => uuid_to_partition_hash(key) % num_partitions,
504 }
505 }
506}
507
508impl FromBytesFrame<'_> for PartitionSelector {
509 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
510 <Uuid>::from_bytes_frame(frame)
511 .map(PartitionSelector::ByKey)
512 .or_else(|_| <PartitionId>::from_bytes_frame(frame).map(PartitionSelector::ById))
513 }
514}
515
516#[derive(Debug, Clone, PartialEq)]
518pub enum PartitionRange {
519 Single(PartitionSelector),
521 Range(u16, u16),
523 List(Vec<PartitionSelector>),
525 All,
527}
528
529impl PartitionRange {
530 pub fn expand(&self, num_partitions: u16) -> Vec<PartitionId> {
532 match self {
533 PartitionRange::Single(selector) => {
534 vec![selector.into_partition_id(num_partitions)]
535 }
536 PartitionRange::Range(start, end) => {
537 let start = (*start).min(num_partitions.saturating_sub(1));
538 let end = (*end).min(num_partitions.saturating_sub(1));
539 if start <= end {
540 (start..=end).collect()
541 } else {
542 vec![]
543 }
544 }
545 PartitionRange::List(selectors) => selectors
546 .iter()
547 .map(|s| s.into_partition_id(num_partitions))
548 .collect(),
549 PartitionRange::All => (0..num_partitions).collect(),
550 }
551 }
552}
553
554#[derive(Debug, Clone)]
556pub enum FromSequenceSpec {
557 Single(u64),
559 PerPartition(HashMap<u16, u64>),
561}
562
563impl FromSequenceSpec {
564 pub fn get_sequence_for_partition(&self, partition_id: u16) -> Option<u64> {
566 match self {
567 FromSequenceSpec::Single(seq) => Some(*seq),
568 FromSequenceSpec::PerPartition(map) => map.get(&partition_id).copied(),
569 }
570 }
571}
572
573impl FromBytesFrame<'_> for PartitionRange {
574 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
575 if let Ok(s) = <&str>::from_bytes_frame(frame) {
577 if s == "*" {
579 return Ok(PartitionRange::All);
580 }
581
582 if let Some(dash_pos) = s.find('-') {
584 let start_str = &s[..dash_pos];
585 let end_str = &s[dash_pos + 1..];
586
587 let start: u16 = start_str
588 .parse()
589 .map_err(|_| format!("invalid start partition ID in range: '{start_str}'"))?;
590 let end: u16 = end_str
591 .parse()
592 .map_err(|_| format!("invalid end partition ID in range: '{end_str}'"))?;
593
594 return Ok(PartitionRange::Range(start, end));
595 }
596
597 if s.contains(',') {
599 let mut selectors = Vec::new();
600 for part in s.split(',') {
601 let part = part.trim();
602 let selector = if let Ok(uuid) = part.parse::<Uuid>() {
604 PartitionSelector::ByKey(uuid)
605 } else if let Ok(id) = part.parse::<u16>() {
606 PartitionSelector::ById(id)
607 } else {
608 return Err(format!("invalid partition selector in list: '{part}'"));
609 };
610 selectors.push(selector);
611 }
612 return Ok(PartitionRange::List(selectors));
613 }
614 }
615
616 PartitionSelector::from_bytes_frame(frame).map(PartitionRange::Single)
618 }
619}
620
621impl FromBytesFrame<'_> for FromSequenceSpec {
622 fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
623 if let Ok(sequence) = u64::from_bytes_frame(frame) {
625 return Ok(FromSequenceSpec::Single(sequence));
626 }
627
628 if let Ok(s) = <&str>::from_bytes_frame(frame) {
630 if let Ok(sequence) = s.parse::<u64>() {
632 return Ok(FromSequenceSpec::Single(sequence));
633 }
634
635 if s.contains(':') {
637 let mut partition_sequences = HashMap::new();
638
639 for pair in s.split(',') {
640 let parts: Vec<&str> = pair.split(':').collect();
641 if parts.len() != 2 {
642 return Err(format!("invalid partition:sequence pair: '{pair}'"));
643 }
644
645 let partition_id: u16 = parts[0]
646 .parse()
647 .map_err(|_| format!("invalid partition ID: '{}'", parts[0]))?;
648 let sequence: u64 = parts[1]
649 .parse()
650 .map_err(|_| format!("invalid sequence number: '{}'", parts[1]))?;
651
652 partition_sequences.insert(partition_id, sequence);
653 }
654
655 if partition_sequences.is_empty() {
656 return Err("no valid partition:sequence pairs found".to_string());
657 }
658
659 return Ok(FromSequenceSpec::PerPartition(partition_sequences));
660 }
661 }
662
663 match frame {
664 BytesFrame::Map { data, .. } => {
666 let mut partition_sequences = HashMap::new();
667
668 for (key_frame, value_frame) in data {
669 let partition_id = match key_frame {
671 BytesFrame::SimpleString { data, .. }
672 | BytesFrame::BlobString { data, .. } => std::str::from_utf8(data)
673 .map_err(|_| "invalid UTF-8 in partition ID key")?
674 .parse::<u16>()
675 .map_err(|_| "invalid partition ID in map key")?,
676 BytesFrame::Number { data, .. } => {
677 if *data < 0 || *data > u16::MAX as i64 {
678 return Err("partition ID out of range".to_string());
679 }
680 *data as u16
681 }
682 _ => return Err("invalid type for partition ID key in map".to_string()),
683 };
684
685 let sequence = u64::from_bytes_frame(value_frame)
687 .map_err(|_| "invalid sequence value in map")?;
688
689 partition_sequences.insert(partition_id, sequence);
690 }
691
692 Ok(FromSequenceSpec::PerPartition(partition_sequences))
693 }
694
695 BytesFrame::Array { data, .. } => {
697 let mut partition_sequences = HashMap::new();
698
699 for item in data {
700 if let BytesFrame::Array { data: pair, .. } = item {
701 if pair.len() != 2 {
702 return Err(
703 "expected [partition_id, sequence] pairs in array".to_string()
704 );
705 }
706
707 let partition_id = match &pair[0] {
709 BytesFrame::SimpleString { data, .. }
710 | BytesFrame::BlobString { data, .. } => std::str::from_utf8(data)
711 .map_err(|_| "invalid UTF-8 in partition ID")?
712 .parse::<u16>()
713 .map_err(|_| "invalid partition ID in array pair")?,
714 BytesFrame::Number { data, .. } => {
715 if *data < 0 || *data > u16::MAX as i64 {
716 return Err(
717 "partition ID out of range in array pair".to_string()
718 );
719 }
720 *data as u16
721 }
722 _ => {
723 return Err(
724 "invalid type for partition ID in array pair".to_string()
725 );
726 }
727 };
728
729 let sequence = u64::from_bytes_frame(&pair[1])
731 .map_err(|_| "invalid sequence in array pair")?;
732
733 partition_sequences.insert(partition_id, sequence);
734 } else {
735 return Err(
736 "expected array pairs in FROM_SEQUENCE array format".to_string()
737 );
738 }
739 }
740
741 Ok(FromSequenceSpec::PerPartition(partition_sequences))
742 }
743
744 _ => Err("expected number, map, or array for FROM_SEQUENCE".to_string()),
745 }
746 }
747}
748
749#[inline(always)]
750pub fn simple_str(s: impl Into<Bytes>) -> BytesFrame {
751 BytesFrame::SimpleString {
752 data: s.into(),
753 attributes: None,
754 }
755}
756
757#[inline(always)]
758pub fn blob_str(s: impl Into<Bytes>) -> BytesFrame {
759 BytesFrame::BlobString {
760 data: s.into(),
761 attributes: None,
762 }
763}
764
765#[inline(always)]
766pub fn number(n: i64) -> BytesFrame {
767 BytesFrame::Number {
768 data: n,
769 attributes: None,
770 }
771}
772
773#[inline(always)]
774pub fn double(n: f64) -> BytesFrame {
775 BytesFrame::Double {
776 data: n,
777 attributes: None,
778 }
779}
780
781#[inline(always)]
782pub fn map(items: IndexMap<BytesFrame, BytesFrame>) -> BytesFrame {
783 BytesFrame::Map {
784 data: items,
785 attributes: None,
786 }
787}
788
789#[inline(always)]
790pub fn array(items: Vec<BytesFrame>) -> BytesFrame {
791 BytesFrame::Array {
792 data: items,
793 attributes: None,
794 }
795}
796
797#[inline(always)]
798pub fn encode_event(record: EventRecord) -> BytesFrame {
799 map(indexmap! {
800 simple_str("event_id") => simple_str(record.event_id.to_string()),
801 simple_str("partition_key") => simple_str(record.partition_key.to_string()),
802 simple_str("partition_id") => number(record.partition_id as i64),
803 simple_str("transaction_id") => simple_str(record.transaction_id.to_string()),
804 simple_str("partition_sequence") => number(record.partition_sequence as i64),
805 simple_str("stream_version") => number(record.stream_version as i64),
806 simple_str("timestamp") => number((record.timestamp / 1_000_000) as i64),
807 simple_str("stream_id") => blob_str(record.stream_id.to_string()),
808 simple_str("event_name") => blob_str(record.event_name),
809 simple_str("metadata") => blob_str(record.metadata),
810 simple_str("payload") => blob_str(record.payload),
811 })
812}