sierradb_server/
request.rs

1pub mod eack;
2pub mod eappend;
3pub mod eget;
4pub mod emappend;
5pub mod epscan;
6pub mod epseq;
7pub mod epsub;
8pub mod escan;
9pub mod esub;
10pub mod esver;
11pub mod hello;
12pub mod ping;
13
14use std::collections::HashMap;
15use std::num::{ParseIntError, TryFromIntError};
16
17use bytes::Bytes;
18use combine::{Parser, eof};
19use redis_protocol::resp3::types::{BytesFrame, VerbatimStringFormat};
20use sierradb::StreamId;
21use sierradb::bucket::PartitionId;
22use sierradb::bucket::segment::EventRecord;
23use sierradb::database::ExpectedVersion;
24use sierradb::id::uuid_to_partition_hash;
25use sierradb_cluster::subscription::{FromSequences, SubscriptionMatcher};
26use sierradb_protocol::ErrorCode;
27use tokio::io;
28use tracing::debug;
29use uuid::Uuid;
30
31use crate::parser::frame_stream;
32use crate::request::eack::EAck;
33use crate::request::eappend::EAppend;
34use crate::request::eget::EGet;
35use crate::request::emappend::EMAppend;
36use crate::request::epscan::EPScan;
37use crate::request::epseq::EPSeq;
38use crate::request::epsub::EPSub;
39use crate::request::escan::EScan;
40use crate::request::esub::ESub;
41use crate::request::esver::ESVer;
42use crate::request::hello::Hello;
43use crate::request::ping::Ping;
44use crate::server::Conn;
45
46pub enum Command {
47    EAck,
48    EAppend,
49    EGet,
50    EMAppend,
51    EPScan,
52    EPSeq,
53    EPSub,
54    ESVer,
55    EScan,
56    ESub,
57    Hello,
58    Ping,
59}
60
61impl Command {
62    pub async fn handle(
63        &self,
64        args: &[BytesFrame],
65        conn: &mut Conn,
66    ) -> Result<Option<BytesFrame>, io::Error> {
67        macro_rules! handle_commands {
68            ( $( $name:ident ),* $(,)? ) => {
69                match self {
70                    $( Command::$name => {
71                        let stream = frame_stream(args);
72                        match $name::parser().skip(eof()).parse(stream) {
73                            Ok((cmd, _)) => cmd.handle_request_failable(conn).await,
74                            Err(err) => {
75                                Ok(Some(BytesFrame::SimpleError {
76                                    data: err.to_string().into(),
77                                    attributes: None,
78                                }))
79                            }
80                        }
81                    } )*
82                }
83            };
84        }
85
86        handle_commands![
87            EAck, EAppend, EGet, EMAppend, EPScan, EPSeq, EPSub, EScan, ESVer, ESub, Hello, Ping
88        ]
89    }
90}
91
92impl TryFrom<&BytesFrame> for Command {
93    type Error = String;
94
95    fn try_from(frame: &BytesFrame) -> Result<Self, Self::Error> {
96        match frame {
97            BytesFrame::BlobString { data, .. }
98            | BytesFrame::SimpleString { data, .. }
99            | BytesFrame::BigNumber { data, .. }
100            | BytesFrame::VerbatimString {
101                data,
102                format: VerbatimStringFormat::Text,
103                ..
104            } => {
105                match str::from_utf8(data)
106                    .map_err(|_| "invalid command".to_string())?
107                    .to_ascii_uppercase()
108                    .as_str()
109                {
110                    "EACK" => Ok(Command::EAck),
111                    "EAPPEND" => Ok(Command::EAppend),
112                    "EGET" => Ok(Command::EGet),
113                    "EMAPPEND" => Ok(Command::EMAppend),
114                    "EPSCAN" => Ok(Command::EPScan),
115                    "EPSEQ" => Ok(Command::EPSeq),
116                    "EPSUB" => Ok(Command::EPSub),
117                    "ESVER" => Ok(Command::ESVer),
118                    "ESCAN" => Ok(Command::EScan),
119                    "ESUB" => Ok(Command::ESub),
120                    "HELLO" => Ok(Command::Hello),
121                    "PING" => Ok(Command::Ping),
122                    cmd => {
123                        debug!("received unknown command {cmd}");
124                        Err(ErrorCode::InvalidArg.with_message(format!("unknown command '{cmd}'")))
125                    }
126                }
127            }
128            _ => Err(ErrorCode::InvalidArg.with_message("invalid type for command name")),
129        }
130    }
131}
132
133pub trait HandleRequest: Sized + Send {
134    type Ok: Into<BytesFrame>;
135    type Error: ToString;
136
137    fn handle_request(
138        self,
139        conn: &mut Conn,
140    ) -> impl Future<Output = Result<Option<Self::Ok>, Self::Error>> + Send;
141
142    fn handle_request_failable(
143        self,
144        conn: &mut Conn,
145    ) -> impl Future<Output = Result<Option<BytesFrame>, io::Error>> + Send {
146        async move {
147            match self.handle_request(conn).await {
148                Ok(Some(resp)) => Ok(Some(resp.into())),
149                Ok(None) => Ok(None),
150                Err(err) => Ok(Some(BytesFrame::SimpleError {
151                    data: err.to_string().into(),
152                    attributes: None,
153                })),
154            }
155        }
156    }
157}
158
159pub trait FromArgs: Sized {
160    fn from_args(args: &[BytesFrame]) -> Result<Self, String>;
161}
162
163impl FromArgs for SubscriptionMatcher {
164    fn from_args(args: &[BytesFrame]) -> Result<Self, String> {
165        let mut i = 0;
166        let kind = <&str>::from_bytes_frame(
167            args.get(i)
168                .ok_or_else(|| ErrorCode::InvalidArg.with_message("missing subscription type"))?,
169        )
170        .map_err(|err| {
171            ErrorCode::InvalidArg.with_message(format!("invalid subscription type: {err}"))
172        })?;
173        i += 1;
174        match kind {
175            "ALL_PARTITIONS" | "all_partitions" => {
176                let from_sequences_kind =
177                    <&str>::from_bytes_frame(args.get(i).ok_or_else(|| {
178                        ErrorCode::InvalidArg.with_message("missing start filter")
179                    })?)
180                    .map_err(|err| {
181                        ErrorCode::InvalidArg.with_message(format!("invalid start filter: {err}"))
182                    })?;
183                i += 1;
184                match from_sequences_kind {
185                    "LATEST" | "latest" => Ok(SubscriptionMatcher::AllPartitions {
186                        from_sequences: FromSequences::Latest,
187                    }),
188                    "ALL" | "all" => {
189                        let from_sequence =
190                            u64::from_bytes_frame(args.get(i).ok_or_else(|| {
191                                ErrorCode::InvalidArg.with_message("missing start sequence")
192                            })?)
193                            .map_err(|err| {
194                                ErrorCode::InvalidArg
195                                    .with_message(format!("invalid start sequence: {err}"))
196                            })?;
197
198                        Ok(SubscriptionMatcher::AllPartitions {
199                            from_sequences: FromSequences::AllPartitions(from_sequence),
200                        })
201                    }
202                    "PARTITIONS" | "partitions" => {
203                        let mut from_sequences = HashMap::new();
204
205                        loop {
206                            let Some(arg) = args.get(i) else {
207                                break;
208                            };
209
210                            i += 1;
211
212                            match PartitionId::from_bytes_frame(arg) {
213                                Ok(partition_id) => {
214                                    let from_sequence =
215                                        u64::from_bytes_frame(args.get(i).ok_or_else(|| {
216                                            ErrorCode::InvalidArg
217                                                .with_message("missing from sequence")
218                                        })?)
219                                        .map_err(
220                                            |err| {
221                                                ErrorCode::InvalidArg.with_message(format!(
222                                                    "invalid from sequence: {err}"
223                                                ))
224                                            },
225                                        )?;
226
227                                    i += 1;
228
229                                    from_sequences.insert(partition_id, from_sequence);
230                                }
231                                Err(err) => {
232                                    let Ok(fallback_keyword) = <&str>::from_bytes_frame(arg) else {
233                                        return Err(err);
234                                    };
235                                    if fallback_keyword != "FALLBACK"
236                                        && fallback_keyword != "fallback"
237                                    {
238                                        return Err(err);
239                                    }
240
241                                    let fallback =
242                                        u64::from_bytes_frame(args.get(i).ok_or_else(|| {
243                                            ErrorCode::InvalidArg
244                                                .with_message("missing fallback sequence")
245                                        })?)
246                                        .map_err(
247                                            |err| {
248                                                ErrorCode::InvalidArg.with_message(format!(
249                                                    "invalid fallback sequence: {err}"
250                                                ))
251                                            },
252                                        )?;
253
254                                    return Ok(SubscriptionMatcher::AllPartitions {
255                                        from_sequences: FromSequences::Partitions {
256                                            from_sequences,
257                                            fallback: Some(fallback),
258                                        },
259                                    });
260                                }
261                            }
262                        }
263
264                        Ok(SubscriptionMatcher::AllPartitions {
265                            from_sequences: FromSequences::Partitions {
266                                from_sequences,
267                                fallback: None,
268                            },
269                        })
270                    }
271                    _ => Err(ErrorCode::InvalidArg
272                        .with_message(format!("unknown start filter '{from_sequences_kind}'"))),
273                }
274            }
275            "PARTITIONS" | "partitions" => {
276                todo!()
277            }
278            "STREAMS" | "streams" => {
279                todo!()
280            }
281            _ => Err(ErrorCode::InvalidArg.with_message("unknown subscription type '{kind}'")),
282        }
283    }
284}
285
286pub trait FromBytesFrame<'a>: Sized {
287    fn from_bytes_frame(frame: &'a BytesFrame) -> Result<Self, String>;
288}
289
290impl<'a, T> FromBytesFrame<'a> for Option<T>
291where
292    T: FromBytesFrame<'a>,
293{
294    fn from_bytes_frame(frame: &'a BytesFrame) -> Result<Self, String> {
295        match frame {
296            BytesFrame::Null => Ok(None),
297            _ => Ok(Some(T::from_bytes_frame(frame)?)),
298        }
299    }
300}
301
302impl FromBytesFrame<'_> for u32 {
303    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
304        match frame {
305            BytesFrame::BlobString { data, .. }
306            | BytesFrame::SimpleString { data, .. }
307            | BytesFrame::BigNumber { data, .. }
308            | BytesFrame::VerbatimString {
309                data,
310                format: VerbatimStringFormat::Text,
311                ..
312            } => {
313                let s = std::str::from_utf8(data).map_err(|_| "invalid string".to_string())?;
314                s.parse::<u32>().map_err(|_| "invalid u32".to_string())
315            }
316            BytesFrame::Number { data, .. } => {
317                if *data < 0 {
318                    Err("negative number for u32".to_string())
319                } else if *data > u32::MAX as i64 {
320                    Err("number too large for u32".to_string())
321                } else {
322                    Ok(*data as u32)
323                }
324            }
325            _ => Err("invalid type for u32".to_string()),
326        }
327    }
328}
329
330impl FromBytesFrame<'_> for i64 {
331    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
332        match frame {
333            BytesFrame::BlobString { data, .. }
334            | BytesFrame::SimpleString { data, .. }
335            | BytesFrame::BigNumber { data, .. }
336            | BytesFrame::VerbatimString {
337                data,
338                format: VerbatimStringFormat::Text,
339                ..
340            } => str::from_utf8(data)
341                .map_err(|err| err.to_string())?
342                .parse()
343                .map_err(|err: ParseIntError| err.to_string()),
344            BytesFrame::Number { data, .. } => Ok(*data),
345            _ => Err("unsupported type, expecting i64".to_string()),
346        }
347    }
348}
349
350impl FromBytesFrame<'_> for u64 {
351    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
352        match frame {
353            BytesFrame::BlobString { data, .. }
354            | BytesFrame::SimpleString { data, .. }
355            | BytesFrame::BigNumber { data, .. }
356            | BytesFrame::VerbatimString {
357                data,
358                format: VerbatimStringFormat::Text,
359                ..
360            } => str::from_utf8(data)
361                .map_err(|err| err.to_string())?
362                .parse()
363                .map_err(|err: ParseIntError| err.to_string()),
364            BytesFrame::Number { data, .. } => (*data)
365                .try_into()
366                .map_err(|err: TryFromIntError| err.to_string()),
367            _ => Err("unsupported type, expecting i64".to_string()),
368        }
369    }
370}
371
372impl FromBytesFrame<'_> for u16 {
373    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
374        match frame {
375            BytesFrame::BlobString { data, .. }
376            | BytesFrame::SimpleString { data, .. }
377            | BytesFrame::BigNumber { data, .. }
378            | BytesFrame::VerbatimString {
379                data,
380                format: VerbatimStringFormat::Text,
381                ..
382            } => str::from_utf8(data)
383                .map_err(|err| err.to_string())?
384                .parse()
385                .map_err(|err: ParseIntError| err.to_string()),
386            BytesFrame::Number { data, .. } => (*data)
387                .try_into()
388                .map_err(|err: TryFromIntError| err.to_string()),
389            _ => Err("unsupported type, expecting i64".to_string()),
390        }
391    }
392}
393
394impl<'a> FromBytesFrame<'a> for &'a str {
395    fn from_bytes_frame(frame: &'a BytesFrame) -> Result<Self, String> {
396        match frame {
397            BytesFrame::BlobString { data, .. }
398            | BytesFrame::SimpleString { data, .. }
399            | BytesFrame::BigNumber { data, .. }
400            | BytesFrame::VerbatimString {
401                data,
402                format: VerbatimStringFormat::Text,
403                ..
404            } => str::from_utf8(data).map_err(|err| err.to_string()),
405            _ => Err("unsupported type, expecting string".to_string()),
406        }
407    }
408}
409
410impl FromBytesFrame<'_> for String {
411    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
412        <&str>::from_bytes_frame(frame).map(ToOwned::to_owned)
413    }
414}
415
416impl<'a> FromBytesFrame<'a> for &'a [u8] {
417    fn from_bytes_frame(frame: &'a BytesFrame) -> Result<Self, String> {
418        match frame {
419            BytesFrame::BlobString { data, .. }
420            | BytesFrame::SimpleString { data, .. }
421            | BytesFrame::BigNumber { data, .. }
422            | BytesFrame::VerbatimString {
423                data,
424                format: VerbatimStringFormat::Text,
425                ..
426            } => Ok(data),
427            _ => Err("unsupported type, expecting bytes".to_string()),
428        }
429    }
430}
431
432impl FromBytesFrame<'_> for Vec<u8> {
433    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
434        <&[u8]>::from_bytes_frame(frame).map(ToOwned::to_owned)
435    }
436}
437
438impl FromBytesFrame<'_> for StreamId {
439    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
440        StreamId::new(<String>::from_bytes_frame(frame)?).map_err(|err| err.to_string())
441    }
442}
443
444impl FromBytesFrame<'_> for Uuid {
445    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
446        <&str>::from_bytes_frame(frame)?
447            .parse()
448            .map_err(|err: uuid::Error| err.to_string())
449    }
450}
451
452impl FromBytesFrame<'_> for ExpectedVersion {
453    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
454        <u64>::from_bytes_frame(frame)
455            .map(ExpectedVersion::Exact)
456            .or_else(|_| {
457                <&str>::from_bytes_frame(frame).and_then(|s| match s {
458                    "any" | "ANY" => Ok(ExpectedVersion::Any),
459                    "exists" | "EXISTS" => Ok(ExpectedVersion::Exists),
460                    "empty" | "EMPTY" => Ok(ExpectedVersion::Empty),
461                    _ => Err("unknown expected version value".to_string()),
462                })
463            })
464    }
465}
466
467#[derive(Debug, Clone, PartialEq)]
468pub enum RangeValue {
469    Start,      // "-"
470    End,        // "+"
471    Value(u64), // specific number
472}
473
474impl FromBytesFrame<'_> for RangeValue {
475    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
476        <&str>::from_bytes_frame(frame)
477            .and_then(|s| match s {
478                "-" => Ok(RangeValue::Start),
479                "+" => Ok(RangeValue::End),
480                _ => Err(String::default()),
481            })
482            .or_else(|_| <u64>::from_bytes_frame(frame).map(RangeValue::Value))
483            .map_err(|_| "unknown range value, expected '-', '+', or number".to_string())
484    }
485}
486
487#[derive(Clone, Copy, Debug, PartialEq)]
488pub enum PartitionSelector {
489    ById(PartitionId), // 0-65535
490    ByKey(Uuid),       // 550e8400-e29b-41d4-a716-446655440000
491}
492
493impl PartitionSelector {
494    pub fn into_partition_id(self, num_partitions: u16) -> PartitionId {
495        match self {
496            PartitionSelector::ById(id) => id,
497            PartitionSelector::ByKey(key) => uuid_to_partition_hash(key) % num_partitions,
498        }
499    }
500}
501
502impl FromBytesFrame<'_> for PartitionSelector {
503    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
504        <Uuid>::from_bytes_frame(frame)
505            .map(PartitionSelector::ByKey)
506            .or_else(|_| <PartitionId>::from_bytes_frame(frame).map(PartitionSelector::ById))
507    }
508}
509
510/// Represents a range of partitions for multi-partition subscriptions
511#[derive(Debug, Clone, PartialEq)]
512pub enum PartitionRange {
513    /// Single partition (backwards compatibility)
514    Single(PartitionSelector),
515    /// Range of partition IDs (inclusive): "0-127", "50-99"
516    Range(u16, u16),
517    /// Explicit list of partitions: "0,1,5,42"
518    List(Vec<PartitionSelector>),
519    /// All partitions: "*"
520    All,
521}
522
523impl PartitionRange {
524    /// Expand the range into a vector of concrete partition IDs
525    pub fn expand(&self, num_partitions: u16) -> Vec<PartitionId> {
526        match self {
527            PartitionRange::Single(selector) => {
528                vec![selector.into_partition_id(num_partitions)]
529            }
530            PartitionRange::Range(start, end) => {
531                let start = (*start).min(num_partitions.saturating_sub(1));
532                let end = (*end).min(num_partitions.saturating_sub(1));
533                if start <= end {
534                    (start..=end).collect()
535                } else {
536                    vec![]
537                }
538            }
539            PartitionRange::List(selectors) => selectors
540                .iter()
541                .map(|s| s.into_partition_id(num_partitions))
542                .collect(),
543            PartitionRange::All => (0..num_partitions).collect(),
544        }
545    }
546}
547
548/// Specification for FROM_SEQUENCE parameter in multi-partition subscriptions  
549#[derive(Debug, Clone)]
550pub enum FromSequenceSpec {
551    /// Same sequence for all partitions
552    Single(u64),
553    /// Individual sequences per partition: {partition_id: sequence}
554    PerPartition(HashMap<u16, u64>),
555}
556
557impl FromSequenceSpec {
558    /// Get the FROM_SEQUENCE value for a specific partition
559    pub fn get_sequence_for_partition(&self, partition_id: u16) -> Option<u64> {
560        match self {
561            FromSequenceSpec::Single(seq) => Some(*seq),
562            FromSequenceSpec::PerPartition(map) => map.get(&partition_id).copied(),
563        }
564    }
565}
566
567impl FromBytesFrame<'_> for PartitionRange {
568    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
569        // Try to parse as a string first for ranges, lists, and "*"
570        if let Ok(s) = <&str>::from_bytes_frame(frame) {
571            // Check for wildcard "*"
572            if s == "*" {
573                return Ok(PartitionRange::All);
574            }
575
576            // Check for range format "start-end"
577            if let Some(dash_pos) = s.find('-') {
578                let start_str = &s[..dash_pos];
579                let end_str = &s[dash_pos + 1..];
580
581                let start: u16 = start_str
582                    .parse()
583                    .map_err(|_| format!("invalid start partition ID in range: '{start_str}'"))?;
584                let end: u16 = end_str
585                    .parse()
586                    .map_err(|_| format!("invalid end partition ID in range: '{end_str}'"))?;
587
588                return Ok(PartitionRange::Range(start, end));
589            }
590
591            // Check for comma-separated list "0,1,5,42"
592            if s.contains(',') {
593                let mut selectors = Vec::new();
594                for part in s.split(',') {
595                    let part = part.trim();
596                    // Try UUID first, then partition ID
597                    let selector = if let Ok(uuid) = part.parse::<Uuid>() {
598                        PartitionSelector::ByKey(uuid)
599                    } else if let Ok(id) = part.parse::<u16>() {
600                        PartitionSelector::ById(id)
601                    } else {
602                        return Err(format!("invalid partition selector in list: '{part}'"));
603                    };
604                    selectors.push(selector);
605                }
606                return Ok(PartitionRange::List(selectors));
607            }
608        }
609
610        // Fall back to single partition selector (backwards compatibility)
611        PartitionSelector::from_bytes_frame(frame).map(PartitionRange::Single)
612    }
613}
614
615impl FromBytesFrame<'_> for FromSequenceSpec {
616    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
617        // Try parsing as a single number first
618        if let Ok(sequence) = u64::from_bytes_frame(frame) {
619            return Ok(FromSequenceSpec::Single(sequence));
620        }
621
622        // Try parsing as string (could be comma-separated pairs or single number)
623        if let Ok(s) = <&str>::from_bytes_frame(frame) {
624            // Try parsing as single number first
625            if let Ok(sequence) = s.parse::<u64>() {
626                return Ok(FromSequenceSpec::Single(sequence));
627            }
628
629            // Parse as "partition:sequence,partition:sequence" format
630            if s.contains(':') {
631                let mut partition_sequences = HashMap::new();
632
633                for pair in s.split(',') {
634                    let parts: Vec<&str> = pair.split(':').collect();
635                    if parts.len() != 2 {
636                        return Err(format!("invalid partition:sequence pair: '{pair}'"));
637                    }
638
639                    let partition_id: u16 = parts[0]
640                        .parse()
641                        .map_err(|_| format!("invalid partition ID: '{}'", parts[0]))?;
642                    let sequence: u64 = parts[1]
643                        .parse()
644                        .map_err(|_| format!("invalid sequence number: '{}'", parts[1]))?;
645
646                    partition_sequences.insert(partition_id, sequence);
647                }
648
649                if partition_sequences.is_empty() {
650                    return Err("no valid partition:sequence pairs found".to_string());
651                }
652
653                return Ok(FromSequenceSpec::PerPartition(partition_sequences));
654            }
655        }
656
657        match frame {
658            // RESP3 Map format: {"0": 501, "1": 1230}
659            BytesFrame::Map { data, .. } => {
660                let mut partition_sequences = HashMap::new();
661
662                for (key_frame, value_frame) in data {
663                    // Parse partition ID from key
664                    let partition_id = match key_frame {
665                        BytesFrame::SimpleString { data, .. }
666                        | BytesFrame::BlobString { data, .. } => std::str::from_utf8(data)
667                            .map_err(|_| "invalid UTF-8 in partition ID key")?
668                            .parse::<u16>()
669                            .map_err(|_| "invalid partition ID in map key")?,
670                        BytesFrame::Number { data, .. } => {
671                            if *data < 0 || *data > u16::MAX as i64 {
672                                return Err("partition ID out of range".to_string());
673                            }
674                            *data as u16
675                        }
676                        _ => return Err("invalid type for partition ID key in map".to_string()),
677                    };
678
679                    // Parse sequence from value
680                    let sequence = u64::from_bytes_frame(value_frame)
681                        .map_err(|_| "invalid sequence value in map")?;
682
683                    partition_sequences.insert(partition_id, sequence);
684                }
685
686                Ok(FromSequenceSpec::PerPartition(partition_sequences))
687            }
688
689            // RESP3 Array format: [["0", "501"], ["1", "1230"]]
690            BytesFrame::Array { data, .. } => {
691                let mut partition_sequences = HashMap::new();
692
693                for item in data {
694                    if let BytesFrame::Array { data: pair, .. } = item {
695                        if pair.len() != 2 {
696                            return Err(
697                                "expected [partition_id, sequence] pairs in array".to_string()
698                            );
699                        }
700
701                        // Parse partition ID from first element
702                        let partition_id = match &pair[0] {
703                            BytesFrame::SimpleString { data, .. }
704                            | BytesFrame::BlobString { data, .. } => std::str::from_utf8(data)
705                                .map_err(|_| "invalid UTF-8 in partition ID")?
706                                .parse::<u16>()
707                                .map_err(|_| "invalid partition ID in array pair")?,
708                            BytesFrame::Number { data, .. } => {
709                                if *data < 0 || *data > u16::MAX as i64 {
710                                    return Err(
711                                        "partition ID out of range in array pair".to_string()
712                                    );
713                                }
714                                *data as u16
715                            }
716                            _ => {
717                                return Err(
718                                    "invalid type for partition ID in array pair".to_string()
719                                );
720                            }
721                        };
722
723                        // Parse sequence from second element
724                        let sequence = u64::from_bytes_frame(&pair[1])
725                            .map_err(|_| "invalid sequence in array pair")?;
726
727                        partition_sequences.insert(partition_id, sequence);
728                    } else {
729                        return Err(
730                            "expected array pairs in FROM_SEQUENCE array format".to_string()
731                        );
732                    }
733                }
734
735                Ok(FromSequenceSpec::PerPartition(partition_sequences))
736            }
737
738            _ => Err("expected number, map, or array for FROM_SEQUENCE".to_string()),
739        }
740    }
741}
742
743#[inline(always)]
744pub fn simple_str(s: impl Into<Bytes>) -> BytesFrame {
745    BytesFrame::SimpleString {
746        data: s.into(),
747        attributes: None,
748    }
749}
750
751#[inline(always)]
752pub fn blob_str(s: impl Into<Bytes>) -> BytesFrame {
753    BytesFrame::BlobString {
754        data: s.into(),
755        attributes: None,
756    }
757}
758
759#[inline(always)]
760pub fn number(n: i64) -> BytesFrame {
761    BytesFrame::Number {
762        data: n,
763        attributes: None,
764    }
765}
766
767#[inline(always)]
768pub fn map(items: HashMap<BytesFrame, BytesFrame>) -> BytesFrame {
769    BytesFrame::Map {
770        data: items,
771        attributes: None,
772    }
773}
774
775#[inline(always)]
776pub fn array(items: Vec<BytesFrame>) -> BytesFrame {
777    BytesFrame::Array {
778        data: items,
779        attributes: None,
780    }
781}
782
783#[inline(always)]
784pub fn encode_event(record: EventRecord) -> BytesFrame {
785    map(HashMap::from_iter([
786        (
787            simple_str("event_id"),
788            simple_str(record.event_id.to_string()),
789        ),
790        (
791            simple_str("partition_key"),
792            simple_str(record.partition_key.to_string()),
793        ),
794        (
795            simple_str("partition_id"),
796            number(record.partition_id as i64),
797        ),
798        (
799            simple_str("transaction_id"),
800            simple_str(record.transaction_id.to_string()),
801        ),
802        (
803            simple_str("partition_sequence"),
804            number(record.partition_sequence as i64),
805        ),
806        (
807            simple_str("stream_version"),
808            number(record.stream_version as i64),
809        ),
810        (
811            simple_str("timestamp"),
812            number((record.timestamp / 1_000_000) as i64),
813        ),
814        (
815            simple_str("stream_id"),
816            blob_str(record.stream_id.to_string()),
817        ),
818        (simple_str("event_name"), blob_str(record.event_name)),
819        (simple_str("metadata"), blob_str(record.metadata)),
820        (simple_str("payload"), blob_str(record.payload)),
821    ]))
822}