Skip to main content

sierradb_server/
request.rs

1pub mod eack;
2pub mod eappend;
3pub mod eget;
4pub mod emappend;
5pub mod epscan;
6pub mod epseq;
7pub mod epsub;
8pub mod escan;
9pub mod esub;
10pub mod esver;
11pub mod hello;
12pub mod info;
13pub mod ping;
14
15use std::collections::HashMap;
16use std::num::{ParseIntError, TryFromIntError};
17
18use bytes::Bytes;
19use combine::{Parser, eof};
20use indexmap::{IndexMap, indexmap};
21use redis_protocol::resp3::types::{BytesFrame, VerbatimStringFormat};
22use sierradb::StreamId;
23use sierradb::bucket::PartitionId;
24use sierradb::bucket::segment::EventRecord;
25use sierradb::database::ExpectedVersion;
26use sierradb::id::uuid_to_partition_hash;
27use sierradb_cluster::subscription::{FromSequences, SubscriptionMatcher};
28use sierradb_protocol::ErrorCode;
29use tokio::io;
30use tracing::debug;
31use uuid::Uuid;
32
33use crate::parser::frame_stream;
34use crate::request::eack::EAck;
35use crate::request::eappend::EAppend;
36use crate::request::eget::EGet;
37use crate::request::emappend::EMAppend;
38use crate::request::epscan::EPScan;
39use crate::request::epseq::EPSeq;
40use crate::request::epsub::EPSub;
41use crate::request::escan::EScan;
42use crate::request::esub::ESub;
43use crate::request::esver::ESVer;
44use crate::request::hello::Hello;
45use crate::request::info::Info;
46use crate::request::ping::Ping;
47use crate::server::Conn;
48
49pub enum Command {
50    EAck,
51    EAppend,
52    EGet,
53    EMAppend,
54    EPScan,
55    EPSeq,
56    EPSub,
57    ESVer,
58    EScan,
59    ESub,
60    Hello,
61    Info,
62    Ping,
63}
64
65impl Command {
66    pub async fn handle(
67        &self,
68        args: &[BytesFrame],
69        conn: &mut Conn,
70    ) -> Result<Option<BytesFrame>, io::Error> {
71        macro_rules! handle_commands {
72            ( $( $name:ident ),* $(,)? ) => {
73                match self {
74                    $( Command::$name => {
75                        let stream = frame_stream(args);
76                        match $name::parser().skip(eof()).parse(stream) {
77                            Ok((cmd, _)) => cmd.handle_request_failable(conn).await,
78                            Err(err) => {
79                                Ok(Some(BytesFrame::SimpleError {
80                                    data: err.to_string().into(),
81                                    attributes: None,
82                                }))
83                            }
84                        }
85                    } )*
86                }
87            };
88        }
89
90        handle_commands![
91            EAck, EAppend, EGet, EMAppend, EPScan, EPSeq, EPSub, EScan, ESVer, ESub, Hello, Info,
92            Ping
93        ]
94    }
95}
96
97impl TryFrom<&BytesFrame> for Command {
98    type Error = String;
99
100    fn try_from(frame: &BytesFrame) -> Result<Self, Self::Error> {
101        match frame {
102            BytesFrame::BlobString { data, .. }
103            | BytesFrame::SimpleString { data, .. }
104            | BytesFrame::BigNumber { data, .. }
105            | BytesFrame::VerbatimString {
106                data,
107                format: VerbatimStringFormat::Text,
108                ..
109            } => {
110                match str::from_utf8(data)
111                    .map_err(|_| "invalid command".to_string())?
112                    .to_ascii_uppercase()
113                    .as_str()
114                {
115                    "EACK" => Ok(Command::EAck),
116                    "EAPPEND" => Ok(Command::EAppend),
117                    "EGET" => Ok(Command::EGet),
118                    "EMAPPEND" => Ok(Command::EMAppend),
119                    "EPSCAN" => Ok(Command::EPScan),
120                    "EPSEQ" => Ok(Command::EPSeq),
121                    "EPSUB" => Ok(Command::EPSub),
122                    "ESVER" => Ok(Command::ESVer),
123                    "ESCAN" => Ok(Command::EScan),
124                    "ESUB" => Ok(Command::ESub),
125                    "HELLO" => Ok(Command::Hello),
126                    "INFO" => Ok(Command::Info),
127                    "PING" => Ok(Command::Ping),
128                    cmd => {
129                        debug!("received unknown command {cmd}");
130                        Err(ErrorCode::InvalidArg.with_message(format!("unknown command '{cmd}'")))
131                    }
132                }
133            }
134            _ => Err(ErrorCode::InvalidArg.with_message("invalid type for command name")),
135        }
136    }
137}
138
139pub trait HandleRequest: Sized + Send {
140    type Ok: Into<BytesFrame>;
141    type Error: ToString;
142
143    fn handle_request(
144        self,
145        conn: &mut Conn,
146    ) -> impl Future<Output = Result<Option<Self::Ok>, Self::Error>> + Send;
147
148    fn handle_request_failable(
149        self,
150        conn: &mut Conn,
151    ) -> impl Future<Output = Result<Option<BytesFrame>, io::Error>> + Send {
152        async move {
153            match self.handle_request(conn).await {
154                Ok(Some(resp)) => Ok(Some(resp.into())),
155                Ok(None) => Ok(None),
156                Err(err) => Ok(Some(BytesFrame::SimpleError {
157                    data: err.to_string().into(),
158                    attributes: None,
159                })),
160            }
161        }
162    }
163}
164
165pub trait FromArgs: Sized {
166    fn from_args(args: &[BytesFrame]) -> Result<Self, String>;
167}
168
169impl FromArgs for SubscriptionMatcher {
170    fn from_args(args: &[BytesFrame]) -> Result<Self, String> {
171        let mut i = 0;
172        let kind = <&str>::from_bytes_frame(
173            args.get(i)
174                .ok_or_else(|| ErrorCode::InvalidArg.with_message("missing subscription type"))?,
175        )
176        .map_err(|err| {
177            ErrorCode::InvalidArg.with_message(format!("invalid subscription type: {err}"))
178        })?;
179        i += 1;
180        match kind {
181            "ALL_PARTITIONS" | "all_partitions" => {
182                let from_sequences_kind =
183                    <&str>::from_bytes_frame(args.get(i).ok_or_else(|| {
184                        ErrorCode::InvalidArg.with_message("missing start filter")
185                    })?)
186                    .map_err(|err| {
187                        ErrorCode::InvalidArg.with_message(format!("invalid start filter: {err}"))
188                    })?;
189                i += 1;
190                match from_sequences_kind {
191                    "LATEST" | "latest" => Ok(SubscriptionMatcher::AllPartitions {
192                        from_sequences: FromSequences::Latest,
193                    }),
194                    "ALL" | "all" => {
195                        let from_sequence =
196                            u64::from_bytes_frame(args.get(i).ok_or_else(|| {
197                                ErrorCode::InvalidArg.with_message("missing start sequence")
198                            })?)
199                            .map_err(|err| {
200                                ErrorCode::InvalidArg
201                                    .with_message(format!("invalid start sequence: {err}"))
202                            })?;
203
204                        Ok(SubscriptionMatcher::AllPartitions {
205                            from_sequences: FromSequences::AllPartitions(from_sequence),
206                        })
207                    }
208                    "PARTITIONS" | "partitions" => {
209                        let mut from_sequences = HashMap::new();
210
211                        loop {
212                            let Some(arg) = args.get(i) else {
213                                break;
214                            };
215
216                            i += 1;
217
218                            match PartitionId::from_bytes_frame(arg) {
219                                Ok(partition_id) => {
220                                    let from_sequence =
221                                        u64::from_bytes_frame(args.get(i).ok_or_else(|| {
222                                            ErrorCode::InvalidArg
223                                                .with_message("missing from sequence")
224                                        })?)
225                                        .map_err(
226                                            |err| {
227                                                ErrorCode::InvalidArg.with_message(format!(
228                                                    "invalid from sequence: {err}"
229                                                ))
230                                            },
231                                        )?;
232
233                                    i += 1;
234
235                                    from_sequences.insert(partition_id, from_sequence);
236                                }
237                                Err(err) => {
238                                    let Ok(fallback_keyword) = <&str>::from_bytes_frame(arg) else {
239                                        return Err(err);
240                                    };
241                                    if fallback_keyword != "FALLBACK"
242                                        && fallback_keyword != "fallback"
243                                    {
244                                        return Err(err);
245                                    }
246
247                                    let fallback =
248                                        u64::from_bytes_frame(args.get(i).ok_or_else(|| {
249                                            ErrorCode::InvalidArg
250                                                .with_message("missing fallback sequence")
251                                        })?)
252                                        .map_err(
253                                            |err| {
254                                                ErrorCode::InvalidArg.with_message(format!(
255                                                    "invalid fallback sequence: {err}"
256                                                ))
257                                            },
258                                        )?;
259
260                                    return Ok(SubscriptionMatcher::AllPartitions {
261                                        from_sequences: FromSequences::Partitions {
262                                            from_sequences,
263                                            fallback: Some(fallback),
264                                        },
265                                    });
266                                }
267                            }
268                        }
269
270                        Ok(SubscriptionMatcher::AllPartitions {
271                            from_sequences: FromSequences::Partitions {
272                                from_sequences,
273                                fallback: None,
274                            },
275                        })
276                    }
277                    _ => Err(ErrorCode::InvalidArg
278                        .with_message(format!("unknown start filter '{from_sequences_kind}'"))),
279                }
280            }
281            "PARTITIONS" | "partitions" => {
282                todo!()
283            }
284            "STREAMS" | "streams" => {
285                todo!()
286            }
287            _ => Err(ErrorCode::InvalidArg.with_message("unknown subscription type '{kind}'")),
288        }
289    }
290}
291
292pub trait FromBytesFrame<'a>: Sized {
293    fn from_bytes_frame(frame: &'a BytesFrame) -> Result<Self, String>;
294}
295
296impl<'a, T> FromBytesFrame<'a> for Option<T>
297where
298    T: FromBytesFrame<'a>,
299{
300    fn from_bytes_frame(frame: &'a BytesFrame) -> Result<Self, String> {
301        match frame {
302            BytesFrame::Null => Ok(None),
303            _ => Ok(Some(T::from_bytes_frame(frame)?)),
304        }
305    }
306}
307
308impl FromBytesFrame<'_> for u32 {
309    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
310        match frame {
311            BytesFrame::BlobString { data, .. }
312            | BytesFrame::SimpleString { data, .. }
313            | BytesFrame::BigNumber { data, .. }
314            | BytesFrame::VerbatimString {
315                data,
316                format: VerbatimStringFormat::Text,
317                ..
318            } => {
319                let s = std::str::from_utf8(data).map_err(|_| "invalid string".to_string())?;
320                s.parse::<u32>().map_err(|_| "invalid u32".to_string())
321            }
322            BytesFrame::Number { data, .. } => {
323                if *data < 0 {
324                    Err("negative number for u32".to_string())
325                } else if *data > u32::MAX as i64 {
326                    Err("number too large for u32".to_string())
327                } else {
328                    Ok(*data as u32)
329                }
330            }
331            _ => Err("invalid type for u32".to_string()),
332        }
333    }
334}
335
336impl FromBytesFrame<'_> for i64 {
337    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
338        match frame {
339            BytesFrame::BlobString { data, .. }
340            | BytesFrame::SimpleString { data, .. }
341            | BytesFrame::BigNumber { data, .. }
342            | BytesFrame::VerbatimString {
343                data,
344                format: VerbatimStringFormat::Text,
345                ..
346            } => str::from_utf8(data)
347                .map_err(|err| err.to_string())?
348                .parse()
349                .map_err(|err: ParseIntError| err.to_string()),
350            BytesFrame::Number { data, .. } => Ok(*data),
351            _ => Err("unsupported type, expecting i64".to_string()),
352        }
353    }
354}
355
356impl FromBytesFrame<'_> for u64 {
357    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
358        match frame {
359            BytesFrame::BlobString { data, .. }
360            | BytesFrame::SimpleString { data, .. }
361            | BytesFrame::BigNumber { data, .. }
362            | BytesFrame::VerbatimString {
363                data,
364                format: VerbatimStringFormat::Text,
365                ..
366            } => str::from_utf8(data)
367                .map_err(|err| err.to_string())?
368                .parse()
369                .map_err(|err: ParseIntError| err.to_string()),
370            BytesFrame::Number { data, .. } => (*data)
371                .try_into()
372                .map_err(|err: TryFromIntError| err.to_string()),
373            _ => Err("unsupported type, expecting i64".to_string()),
374        }
375    }
376}
377
378impl FromBytesFrame<'_> for u16 {
379    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
380        match frame {
381            BytesFrame::BlobString { data, .. }
382            | BytesFrame::SimpleString { data, .. }
383            | BytesFrame::BigNumber { data, .. }
384            | BytesFrame::VerbatimString {
385                data,
386                format: VerbatimStringFormat::Text,
387                ..
388            } => str::from_utf8(data)
389                .map_err(|err| err.to_string())?
390                .parse()
391                .map_err(|err: ParseIntError| err.to_string()),
392            BytesFrame::Number { data, .. } => (*data)
393                .try_into()
394                .map_err(|err: TryFromIntError| err.to_string()),
395            _ => Err("unsupported type, expecting i64".to_string()),
396        }
397    }
398}
399
400impl<'a> FromBytesFrame<'a> for &'a str {
401    fn from_bytes_frame(frame: &'a BytesFrame) -> Result<Self, String> {
402        match frame {
403            BytesFrame::BlobString { data, .. }
404            | BytesFrame::SimpleString { data, .. }
405            | BytesFrame::BigNumber { data, .. }
406            | BytesFrame::VerbatimString {
407                data,
408                format: VerbatimStringFormat::Text,
409                ..
410            } => str::from_utf8(data).map_err(|err| err.to_string()),
411            _ => Err("unsupported type, expecting string".to_string()),
412        }
413    }
414}
415
416impl FromBytesFrame<'_> for String {
417    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
418        <&str>::from_bytes_frame(frame).map(ToOwned::to_owned)
419    }
420}
421
422impl<'a> FromBytesFrame<'a> for &'a [u8] {
423    fn from_bytes_frame(frame: &'a BytesFrame) -> Result<Self, String> {
424        match frame {
425            BytesFrame::BlobString { data, .. }
426            | BytesFrame::SimpleString { data, .. }
427            | BytesFrame::BigNumber { data, .. }
428            | BytesFrame::VerbatimString {
429                data,
430                format: VerbatimStringFormat::Text,
431                ..
432            } => Ok(data),
433            _ => Err("unsupported type, expecting bytes".to_string()),
434        }
435    }
436}
437
438impl FromBytesFrame<'_> for Vec<u8> {
439    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
440        <&[u8]>::from_bytes_frame(frame).map(ToOwned::to_owned)
441    }
442}
443
444impl FromBytesFrame<'_> for StreamId {
445    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
446        StreamId::new(<String>::from_bytes_frame(frame)?).map_err(|err| err.to_string())
447    }
448}
449
450impl FromBytesFrame<'_> for Uuid {
451    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
452        <&str>::from_bytes_frame(frame)?
453            .parse()
454            .map_err(|err: uuid::Error| err.to_string())
455    }
456}
457
458impl FromBytesFrame<'_> for ExpectedVersion {
459    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
460        <u64>::from_bytes_frame(frame)
461            .map(ExpectedVersion::Exact)
462            .or_else(|_| {
463                <&str>::from_bytes_frame(frame).and_then(|s| match s {
464                    "any" | "ANY" => Ok(ExpectedVersion::Any),
465                    "exists" | "EXISTS" => Ok(ExpectedVersion::Exists),
466                    "empty" | "EMPTY" => Ok(ExpectedVersion::Empty),
467                    _ => Err("unknown expected version value".to_string()),
468                })
469            })
470    }
471}
472
473#[derive(Debug, Clone, PartialEq)]
474pub enum RangeValue {
475    Start,      // "-"
476    End,        // "+"
477    Value(u64), // specific number
478}
479
480impl FromBytesFrame<'_> for RangeValue {
481    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
482        <&str>::from_bytes_frame(frame)
483            .and_then(|s| match s {
484                "-" => Ok(RangeValue::Start),
485                "+" => Ok(RangeValue::End),
486                _ => Err(String::default()),
487            })
488            .or_else(|_| <u64>::from_bytes_frame(frame).map(RangeValue::Value))
489            .map_err(|_| "unknown range value, expected '-', '+', or number".to_string())
490    }
491}
492
493#[derive(Clone, Copy, Debug, PartialEq)]
494pub enum PartitionSelector {
495    ById(PartitionId), // 0-65535
496    ByKey(Uuid),       // 550e8400-e29b-41d4-a716-446655440000
497}
498
499impl PartitionSelector {
500    pub fn into_partition_id(self, num_partitions: u16) -> PartitionId {
501        match self {
502            PartitionSelector::ById(id) => id,
503            PartitionSelector::ByKey(key) => uuid_to_partition_hash(key) % num_partitions,
504        }
505    }
506}
507
508impl FromBytesFrame<'_> for PartitionSelector {
509    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
510        <Uuid>::from_bytes_frame(frame)
511            .map(PartitionSelector::ByKey)
512            .or_else(|_| <PartitionId>::from_bytes_frame(frame).map(PartitionSelector::ById))
513    }
514}
515
516/// Represents a range of partitions for multi-partition subscriptions
517#[derive(Debug, Clone, PartialEq)]
518pub enum PartitionRange {
519    /// Single partition (backwards compatibility)
520    Single(PartitionSelector),
521    /// Range of partition IDs (inclusive): "0-127", "50-99"
522    Range(u16, u16),
523    /// Explicit list of partitions: "0,1,5,42"
524    List(Vec<PartitionSelector>),
525    /// All partitions: "*"
526    All,
527}
528
529impl PartitionRange {
530    /// Expand the range into a vector of concrete partition IDs
531    pub fn expand(&self, num_partitions: u16) -> Vec<PartitionId> {
532        match self {
533            PartitionRange::Single(selector) => {
534                vec![selector.into_partition_id(num_partitions)]
535            }
536            PartitionRange::Range(start, end) => {
537                let start = (*start).min(num_partitions.saturating_sub(1));
538                let end = (*end).min(num_partitions.saturating_sub(1));
539                if start <= end {
540                    (start..=end).collect()
541                } else {
542                    vec![]
543                }
544            }
545            PartitionRange::List(selectors) => selectors
546                .iter()
547                .map(|s| s.into_partition_id(num_partitions))
548                .collect(),
549            PartitionRange::All => (0..num_partitions).collect(),
550        }
551    }
552}
553
554/// Specification for FROM_SEQUENCE parameter in multi-partition subscriptions  
555#[derive(Debug, Clone)]
556pub enum FromSequenceSpec {
557    /// Same sequence for all partitions
558    Single(u64),
559    /// Individual sequences per partition: {partition_id: sequence}
560    PerPartition(HashMap<u16, u64>),
561}
562
563impl FromSequenceSpec {
564    /// Get the FROM_SEQUENCE value for a specific partition
565    pub fn get_sequence_for_partition(&self, partition_id: u16) -> Option<u64> {
566        match self {
567            FromSequenceSpec::Single(seq) => Some(*seq),
568            FromSequenceSpec::PerPartition(map) => map.get(&partition_id).copied(),
569        }
570    }
571}
572
573impl FromBytesFrame<'_> for PartitionRange {
574    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
575        // Try to parse as a string first for ranges, lists, and "*"
576        if let Ok(s) = <&str>::from_bytes_frame(frame) {
577            // Check for wildcard "*"
578            if s == "*" {
579                return Ok(PartitionRange::All);
580            }
581
582            // Check for range format "start-end"
583            if let Some(dash_pos) = s.find('-') {
584                let start_str = &s[..dash_pos];
585                let end_str = &s[dash_pos + 1..];
586
587                let start: u16 = start_str
588                    .parse()
589                    .map_err(|_| format!("invalid start partition ID in range: '{start_str}'"))?;
590                let end: u16 = end_str
591                    .parse()
592                    .map_err(|_| format!("invalid end partition ID in range: '{end_str}'"))?;
593
594                return Ok(PartitionRange::Range(start, end));
595            }
596
597            // Check for comma-separated list "0,1,5,42"
598            if s.contains(',') {
599                let mut selectors = Vec::new();
600                for part in s.split(',') {
601                    let part = part.trim();
602                    // Try UUID first, then partition ID
603                    let selector = if let Ok(uuid) = part.parse::<Uuid>() {
604                        PartitionSelector::ByKey(uuid)
605                    } else if let Ok(id) = part.parse::<u16>() {
606                        PartitionSelector::ById(id)
607                    } else {
608                        return Err(format!("invalid partition selector in list: '{part}'"));
609                    };
610                    selectors.push(selector);
611                }
612                return Ok(PartitionRange::List(selectors));
613            }
614        }
615
616        // Fall back to single partition selector (backwards compatibility)
617        PartitionSelector::from_bytes_frame(frame).map(PartitionRange::Single)
618    }
619}
620
621impl FromBytesFrame<'_> for FromSequenceSpec {
622    fn from_bytes_frame(frame: &BytesFrame) -> Result<Self, String> {
623        // Try parsing as a single number first
624        if let Ok(sequence) = u64::from_bytes_frame(frame) {
625            return Ok(FromSequenceSpec::Single(sequence));
626        }
627
628        // Try parsing as string (could be comma-separated pairs or single number)
629        if let Ok(s) = <&str>::from_bytes_frame(frame) {
630            // Try parsing as single number first
631            if let Ok(sequence) = s.parse::<u64>() {
632                return Ok(FromSequenceSpec::Single(sequence));
633            }
634
635            // Parse as "partition:sequence,partition:sequence" format
636            if s.contains(':') {
637                let mut partition_sequences = HashMap::new();
638
639                for pair in s.split(',') {
640                    let parts: Vec<&str> = pair.split(':').collect();
641                    if parts.len() != 2 {
642                        return Err(format!("invalid partition:sequence pair: '{pair}'"));
643                    }
644
645                    let partition_id: u16 = parts[0]
646                        .parse()
647                        .map_err(|_| format!("invalid partition ID: '{}'", parts[0]))?;
648                    let sequence: u64 = parts[1]
649                        .parse()
650                        .map_err(|_| format!("invalid sequence number: '{}'", parts[1]))?;
651
652                    partition_sequences.insert(partition_id, sequence);
653                }
654
655                if partition_sequences.is_empty() {
656                    return Err("no valid partition:sequence pairs found".to_string());
657                }
658
659                return Ok(FromSequenceSpec::PerPartition(partition_sequences));
660            }
661        }
662
663        match frame {
664            // RESP3 Map format: {"0": 501, "1": 1230}
665            BytesFrame::Map { data, .. } => {
666                let mut partition_sequences = HashMap::new();
667
668                for (key_frame, value_frame) in data {
669                    // Parse partition ID from key
670                    let partition_id = match key_frame {
671                        BytesFrame::SimpleString { data, .. }
672                        | BytesFrame::BlobString { data, .. } => std::str::from_utf8(data)
673                            .map_err(|_| "invalid UTF-8 in partition ID key")?
674                            .parse::<u16>()
675                            .map_err(|_| "invalid partition ID in map key")?,
676                        BytesFrame::Number { data, .. } => {
677                            if *data < 0 || *data > u16::MAX as i64 {
678                                return Err("partition ID out of range".to_string());
679                            }
680                            *data as u16
681                        }
682                        _ => return Err("invalid type for partition ID key in map".to_string()),
683                    };
684
685                    // Parse sequence from value
686                    let sequence = u64::from_bytes_frame(value_frame)
687                        .map_err(|_| "invalid sequence value in map")?;
688
689                    partition_sequences.insert(partition_id, sequence);
690                }
691
692                Ok(FromSequenceSpec::PerPartition(partition_sequences))
693            }
694
695            // RESP3 Array format: [["0", "501"], ["1", "1230"]]
696            BytesFrame::Array { data, .. } => {
697                let mut partition_sequences = HashMap::new();
698
699                for item in data {
700                    if let BytesFrame::Array { data: pair, .. } = item {
701                        if pair.len() != 2 {
702                            return Err(
703                                "expected [partition_id, sequence] pairs in array".to_string()
704                            );
705                        }
706
707                        // Parse partition ID from first element
708                        let partition_id = match &pair[0] {
709                            BytesFrame::SimpleString { data, .. }
710                            | BytesFrame::BlobString { data, .. } => std::str::from_utf8(data)
711                                .map_err(|_| "invalid UTF-8 in partition ID")?
712                                .parse::<u16>()
713                                .map_err(|_| "invalid partition ID in array pair")?,
714                            BytesFrame::Number { data, .. } => {
715                                if *data < 0 || *data > u16::MAX as i64 {
716                                    return Err(
717                                        "partition ID out of range in array pair".to_string()
718                                    );
719                                }
720                                *data as u16
721                            }
722                            _ => {
723                                return Err(
724                                    "invalid type for partition ID in array pair".to_string()
725                                );
726                            }
727                        };
728
729                        // Parse sequence from second element
730                        let sequence = u64::from_bytes_frame(&pair[1])
731                            .map_err(|_| "invalid sequence in array pair")?;
732
733                        partition_sequences.insert(partition_id, sequence);
734                    } else {
735                        return Err(
736                            "expected array pairs in FROM_SEQUENCE array format".to_string()
737                        );
738                    }
739                }
740
741                Ok(FromSequenceSpec::PerPartition(partition_sequences))
742            }
743
744            _ => Err("expected number, map, or array for FROM_SEQUENCE".to_string()),
745        }
746    }
747}
748
749#[inline(always)]
750pub fn simple_str(s: impl Into<Bytes>) -> BytesFrame {
751    BytesFrame::SimpleString {
752        data: s.into(),
753        attributes: None,
754    }
755}
756
757#[inline(always)]
758pub fn blob_str(s: impl Into<Bytes>) -> BytesFrame {
759    BytesFrame::BlobString {
760        data: s.into(),
761        attributes: None,
762    }
763}
764
765#[inline(always)]
766pub fn number(n: i64) -> BytesFrame {
767    BytesFrame::Number {
768        data: n,
769        attributes: None,
770    }
771}
772
773#[inline(always)]
774pub fn double(n: f64) -> BytesFrame {
775    BytesFrame::Double {
776        data: n,
777        attributes: None,
778    }
779}
780
781#[inline(always)]
782pub fn map(items: IndexMap<BytesFrame, BytesFrame>) -> BytesFrame {
783    BytesFrame::Map {
784        data: items,
785        attributes: None,
786    }
787}
788
789#[inline(always)]
790pub fn array(items: Vec<BytesFrame>) -> BytesFrame {
791    BytesFrame::Array {
792        data: items,
793        attributes: None,
794    }
795}
796
797#[inline(always)]
798pub fn encode_event(record: EventRecord) -> BytesFrame {
799    map(indexmap! {
800        simple_str("event_id") => simple_str(record.event_id.to_string()),
801        simple_str("partition_key") => simple_str(record.partition_key.to_string()),
802        simple_str("partition_id") => number(record.partition_id as i64),
803        simple_str("transaction_id") => simple_str(record.transaction_id.to_string()),
804        simple_str("partition_sequence") => number(record.partition_sequence as i64),
805        simple_str("stream_version") => number(record.stream_version as i64),
806        simple_str("timestamp") => number((record.timestamp / 1_000_000) as i64),
807        simple_str("stream_id") => blob_str(record.stream_id.to_string()),
808        simple_str("event_name") => blob_str(record.event_name),
809        simple_str("metadata") => blob_str(record.metadata),
810        simple_str("payload") => blob_str(record.payload),
811    })
812}