Skip to main content

sierradb_client/
types.rs

1use std::time::{Duration, SystemTime, UNIX_EPOCH};
2
3use redis::{FromRedisValue, RedisError, RedisResult, RedisWrite, ToRedisArgs, Value};
4use serde::{Deserialize, Serialize};
5use uuid::Uuid;
6
7macro_rules! parse_value {
8    (String, $value:ident, $field:literal) => {
9        match $value {
10            Value::SimpleString(s) => Ok(s.clone()),
11            Value::BulkString(s) => String::from_utf8(s.clone()).map_err(|_| {
12                RedisError::from((
13                    redis::ErrorKind::TypeError,
14                    concat!("Invalid string for ", $field),
15                ))
16            }),
17            _ => Err(RedisError::from((
18                redis::ErrorKind::TypeError,
19                concat!($field, " must be a string"),
20            ))),
21        }
22    };
23    (Uuid, $value:ident, $field:literal) => {
24        match $value {
25            Value::SimpleString(s) => Uuid::parse_str(s).map_err(|_| {
26                RedisError::from((
27                    redis::ErrorKind::TypeError,
28                    concat!("Invalid UUID format for ", $field),
29                ))
30            }),
31            Value::BulkString(s) => str::from_utf8(s)
32                .ok()
33                .and_then(|s| Uuid::parse_str(s).ok())
34                .ok_or_else(|| {
35                    RedisError::from((
36                        redis::ErrorKind::TypeError,
37                        concat!("Invalid UUID format for ", $field),
38                    ))
39                }),
40            _ => Err(RedisError::from((
41                redis::ErrorKind::TypeError,
42                concat!($field, " must be a string"),
43            ))),
44        }
45    };
46}
47
48/// Represents messages received from SierraDB subscriptions.
49#[derive(Clone, Debug, PartialEq, Eq)]
50pub enum SierraMessage {
51    /// An event received from a subscription with cursor information.
52    Event { event: Event, cursor: u64 },
53    /// Confirmation that a subscription was successfully created.
54    SubscriptionConfirmed { subscription_count: i64 },
55}
56
57#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
58pub struct HelloResp {
59    pub server: String,
60    pub version: String,
61    pub peer_id: String,
62    pub num_partitions: u16,
63}
64
65impl FromRedisValue for HelloResp {
66    fn from_redis_value(value: &Value) -> RedisResult<Self> {
67        match value {
68            Value::Map(fields) => {
69                let mut server = None;
70                let mut version = None;
71                let mut peer_id = None;
72                let mut num_partitions = None;
73
74                // Extract fields from map
75                for (key, val) in fields {
76                    let field_name = match key {
77                        Value::SimpleString(s) => s.as_str(),
78                        Value::BulkString(s) => std::str::from_utf8(s).map_err(|_| {
79                            RedisError::from((
80                                redis::ErrorKind::TypeError,
81                                "Invalid string for hello resp map key",
82                            ))
83                        })?,
84                        _ => continue,
85                    };
86
87                    match field_name {
88                        "server" => {
89                            server = Some(parse_value!(String, val, "server")?);
90                        }
91                        "version" => {
92                            version = Some(parse_value!(String, val, "version")?);
93                        }
94                        "peer_id" => {
95                            peer_id = Some(parse_value!(String, val, "peer_id")?);
96                        }
97                        "num_partitions" => {
98                            num_partitions = Some(match val {
99                                Value::Int(n) => *n as u16,
100                                _ => {
101                                    return Err(RedisError::from((
102                                        redis::ErrorKind::TypeError,
103                                        "num_partitions must be an integer",
104                                    )));
105                                }
106                            });
107                        }
108                        _ => {} // Ignore unknown fields
109                    }
110                }
111
112                // Ensure all required fields are present
113                let server = server.ok_or_else(|| {
114                    RedisError::from((
115                        redis::ErrorKind::TypeError,
116                        "Missing required field: server",
117                    ))
118                })?;
119                let version = version.ok_or_else(|| {
120                    RedisError::from((
121                        redis::ErrorKind::TypeError,
122                        "Missing required field: version",
123                    ))
124                })?;
125                let peer_id = peer_id.ok_or_else(|| {
126                    RedisError::from((
127                        redis::ErrorKind::TypeError,
128                        "Missing required field: peer_id",
129                    ))
130                })?;
131                let num_partitions = num_partitions.ok_or_else(|| {
132                    RedisError::from((
133                        redis::ErrorKind::TypeError,
134                        "Missing required field: num_partitions",
135                    ))
136                })?;
137
138                Ok(HelloResp {
139                    server,
140                    version,
141                    peer_id,
142                    num_partitions,
143                })
144            }
145            _ => Err(RedisError::from((
146                redis::ErrorKind::TypeError,
147                "Hello resp must be a Redis map",
148            ))),
149        }
150    }
151}
152
153#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
154pub struct AppendInfo {
155    pub event_id: Uuid,
156    pub partition_key: Uuid,
157    pub partition_id: u16,
158    pub partition_sequence: u64,
159    pub stream_version: u64,
160    pub timestamp: SystemTime,
161}
162
163impl FromRedisValue for AppendInfo {
164    fn from_redis_value(value: &Value) -> RedisResult<Self> {
165        match value {
166            Value::Map(fields) => {
167                let mut event_id = None;
168                let mut partition_key = None;
169                let mut partition_id = None;
170                let mut partition_sequence = None;
171                let mut stream_version = None;
172                let mut timestamp = None;
173
174                // Extract fields from map
175                for (key, val) in fields {
176                    let field_name = match key {
177                        Value::SimpleString(s) => s.as_str(),
178                        _ => continue,
179                    };
180
181                    match field_name {
182                        "event_id" => {
183                            event_id = Some(parse_value!(Uuid, val, "event_id")?);
184                        }
185                        "partition_key" => {
186                            partition_key = Some(parse_value!(Uuid, val, "partition_key")?);
187                        }
188                        "partition_id" => {
189                            partition_id = Some(match val {
190                                Value::Int(n) => *n as u16,
191                                _ => {
192                                    return Err(RedisError::from((
193                                        redis::ErrorKind::TypeError,
194                                        "partition_id must be an integer",
195                                    )));
196                                }
197                            });
198                        }
199                        "partition_sequence" => {
200                            partition_sequence = Some(match val {
201                                Value::Int(n) => *n as u64,
202                                _ => {
203                                    return Err(RedisError::from((
204                                        redis::ErrorKind::TypeError,
205                                        "partition_sequence must be an integer",
206                                    )));
207                                }
208                            });
209                        }
210                        "stream_version" => {
211                            stream_version = Some(match val {
212                                Value::Int(n) => *n as u64,
213                                _ => {
214                                    return Err(RedisError::from((
215                                        redis::ErrorKind::TypeError,
216                                        "stream_version must be an integer",
217                                    )));
218                                }
219                            });
220                        }
221                        "timestamp" => {
222                            timestamp = Some(match val {
223                                Value::Int(ms) => {
224                                    let duration = Duration::from_millis(*ms as u64);
225                                    UNIX_EPOCH + duration
226                                }
227                                _ => {
228                                    return Err(RedisError::from((
229                                        redis::ErrorKind::TypeError,
230                                        "timestamp must be an integer",
231                                    )));
232                                }
233                            });
234                        }
235                        _ => {} // Ignore unknown fields
236                    }
237                }
238
239                // Ensure all required fields are present
240                let event_id = event_id.ok_or_else(|| {
241                    RedisError::from((
242                        redis::ErrorKind::TypeError,
243                        "Missing required field: event_id",
244                    ))
245                })?;
246                let partition_key = partition_key.ok_or_else(|| {
247                    RedisError::from((
248                        redis::ErrorKind::TypeError,
249                        "Missing required field: partition_key",
250                    ))
251                })?;
252                let partition_id = partition_id.ok_or_else(|| {
253                    RedisError::from((
254                        redis::ErrorKind::TypeError,
255                        "Missing required field: partition_id",
256                    ))
257                })?;
258                let partition_sequence = partition_sequence.ok_or_else(|| {
259                    RedisError::from((
260                        redis::ErrorKind::TypeError,
261                        "Missing required field: partition_sequence",
262                    ))
263                })?;
264                let stream_version = stream_version.ok_or_else(|| {
265                    RedisError::from((
266                        redis::ErrorKind::TypeError,
267                        "Missing required field: stream_version",
268                    ))
269                })?;
270                let timestamp = timestamp.ok_or_else(|| {
271                    RedisError::from((
272                        redis::ErrorKind::TypeError,
273                        "Missing required field: timestamp",
274                    ))
275                })?;
276
277                Ok(AppendInfo {
278                    event_id,
279                    partition_key,
280                    partition_id,
281                    partition_sequence,
282                    stream_version,
283                    timestamp,
284                })
285            }
286            _ => Err(RedisError::from((
287                redis::ErrorKind::TypeError,
288                "Append info must be a Redis map",
289            ))),
290        }
291    }
292}
293
294#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
295pub struct EventBatch {
296    pub events: Vec<Event>,
297    pub has_more: bool,
298}
299
300impl FromRedisValue for EventBatch {
301    fn from_redis_value(value: &Value) -> RedisResult<Self> {
302        match value {
303            Value::Map(fields) => {
304                let mut has_more = None;
305                let mut events = None;
306
307                // Extract fields from map
308                for (key, val) in fields {
309                    let field_name = match key {
310                        Value::SimpleString(s) => s.as_str(),
311                        _ => continue,
312                    };
313
314                    match field_name {
315                        "has_more" => {
316                            has_more = Some(match val {
317                                Value::Boolean(b) => *b,
318                                _ => {
319                                    return Err(RedisError::from((
320                                        redis::ErrorKind::TypeError,
321                                        "has_more must be a boolean",
322                                    )));
323                                }
324                            });
325                        }
326                        "events" => {
327                            events = Some(match val {
328                                Value::Array(event_values) => event_values
329                                    .iter()
330                                    .map(Event::from_redis_value)
331                                    .collect::<Result<Vec<_>, _>>()?,
332                                _ => {
333                                    return Err(RedisError::from((
334                                        redis::ErrorKind::TypeError,
335                                        "events must be an array",
336                                    )));
337                                }
338                            });
339                        }
340                        _ => {} // Ignore unknown fields
341                    }
342                }
343
344                // Ensure all required fields are present
345                let has_more = has_more.ok_or_else(|| {
346                    RedisError::from((
347                        redis::ErrorKind::TypeError,
348                        "Missing required field: has_more",
349                    ))
350                })?;
351                let events = events.ok_or_else(|| {
352                    RedisError::from((
353                        redis::ErrorKind::TypeError,
354                        "Missing required field: events",
355                    ))
356                })?;
357
358                Ok(EventBatch { events, has_more })
359            }
360            _ => Err(RedisError::from((
361                redis::ErrorKind::TypeError,
362                "Event batch must be a Redis map",
363            ))),
364        }
365    }
366}
367
368/// Represents a single event record in the event store.
369#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
370pub struct Event {
371    /// Globally unique identifier for this specific event.
372    /// Generated once when the event is created and never changes.
373    pub event_id: Uuid,
374
375    /// Determines which partition this event belongs to.
376    ///
377    /// This is typically a domain-significant identifier (like customer ID,
378    /// tenant ID) that groups related events together. All events for the
379    /// same stream must share the same partition key.
380    pub partition_key: Uuid,
381
382    /// The numeric partition identifier (0-1023) derived from the
383    /// partition_key.
384    ///
385    /// Events with the same partition_id have a guaranteed total ordering
386    /// defined by their partition_sequence, regardless of which stream they
387    /// belong to.
388    pub partition_id: u16,
389
390    /// Identifier for multi-event transactions.
391    ///
392    /// When multiple events are saved as part of a single transaction, they
393    /// share this identifier. For events not part of a transaction, this
394    /// may be a null UUID.
395    pub transaction_id: Uuid,
396
397    /// The monotonic, gapless sequence number within the partition.
398    ///
399    /// This defines the total ordering of events within a partition. Each new
400    /// event in a partition receives a sequence number exactly one higher
401    /// than the previous event.
402    pub partition_sequence: u64,
403
404    /// The version number of the entity/aggregate after this event is applied.
405    ///
406    /// This is a monotonic, gapless counter specific to the stream. It starts
407    /// at 0 and increments by 1 for each event in the stream. Used for
408    /// optimistic concurrency control and to determine the current state
409    /// version of an entity.
410    pub stream_version: u64,
411
412    /// Unix timestamp (in milliseconds) when the event was created.
413    ///
414    /// Useful for time-based queries and analysis, though not used for event
415    /// ordering.
416    pub timestamp: SystemTime,
417
418    /// Identifier for the stream (entity/aggregate) this event belongs to.
419    ///
420    /// Typically corresponds to a domain entity ID, like "account-123" or
421    /// "order-456". All events for the same entity share the same
422    /// stream_id.
423    pub stream_id: String,
424
425    /// Name of the event type, used for deserialization and event handling.
426    ///
427    /// Examples: "AccountCreated", "OrderShipped", "PaymentRefunded".
428    /// Should be meaningful in the domain context.
429    pub event_name: String,
430
431    /// Additional system or application metadata about the event.
432    ///
433    /// May include information like user ID, correlation IDs, causation IDs,
434    /// or other contextual data not part of the event payload itself.
435    pub metadata: Vec<u8>,
436
437    /// The actual event data serialized as bytes.
438    ///
439    /// Contains the domain-specific information that constitutes the event.
440    /// Must be deserializable based on the event_name.
441    pub payload: Vec<u8>,
442}
443
444impl Event {
445    /// Get the sequence/version number to use for acknowledgment.
446    ///
447    /// For stream subscriptions, this returns the stream_version.
448    /// For partition subscriptions, this returns the partition_sequence.
449    /// Use this value with `acknowledge_events()` to acknowledge this event.
450    pub fn sequence_or_version_for_stream(&self) -> u64 {
451        self.stream_version
452    }
453
454    /// Get the sequence number to use for partition subscription
455    /// acknowledgment.
456    ///
457    /// Use this value with `acknowledge_events()` when acknowledging
458    /// events from a partition subscription.
459    pub fn sequence_for_partition(&self) -> u64 {
460        self.partition_sequence
461    }
462
463    /// Get the version number to use for stream subscription acknowledgment.
464    ///
465    /// Use this value with `acknowledge_events()` when acknowledging
466    /// events from a stream subscription.
467    pub fn version_for_stream(&self) -> u64 {
468        self.stream_version
469    }
470}
471
472impl FromRedisValue for Event {
473    fn from_redis_value(value: &Value) -> RedisResult<Self> {
474        match value {
475            Value::Map(fields) => {
476                let mut event_id = None;
477                let mut partition_key = None;
478                let mut partition_id = None;
479                let mut transaction_id = None;
480                let mut partition_sequence = None;
481                let mut stream_version = None;
482                let mut timestamp = None;
483                let mut stream_id = None;
484                let mut event_name = None;
485                let mut metadata = None;
486                let mut payload = None;
487
488                // Extract fields from map
489                for (key, val) in fields {
490                    let field_name = match key {
491                        Value::SimpleString(s) => s.as_str(),
492                        _ => continue,
493                    };
494
495                    match field_name {
496                        "event_id" => {
497                            event_id = Some(parse_value!(Uuid, val, "event_id")?);
498                        }
499                        "partition_key" => {
500                            partition_key = Some(parse_value!(Uuid, val, "partition_key")?);
501                        }
502                        "partition_id" => {
503                            partition_id = Some(match val {
504                                Value::Int(n) => *n as u16,
505                                _ => {
506                                    return Err(RedisError::from((
507                                        redis::ErrorKind::TypeError,
508                                        "partition_id must be an integer",
509                                    )));
510                                }
511                            });
512                        }
513                        "transaction_id" => {
514                            transaction_id = Some(parse_value!(Uuid, val, "transaction_id")?);
515                        }
516                        "partition_sequence" => {
517                            partition_sequence = Some(match val {
518                                Value::Int(n) => *n as u64,
519                                _ => {
520                                    return Err(RedisError::from((
521                                        redis::ErrorKind::TypeError,
522                                        "partition_sequence must be an integer",
523                                    )));
524                                }
525                            });
526                        }
527                        "stream_version" => {
528                            stream_version = Some(match val {
529                                Value::Int(n) => *n as u64,
530                                _ => {
531                                    return Err(RedisError::from((
532                                        redis::ErrorKind::TypeError,
533                                        "stream_version must be an integer",
534                                    )));
535                                }
536                            });
537                        }
538                        "timestamp" => {
539                            timestamp = Some(match val {
540                                Value::Int(ms) => {
541                                    let duration = Duration::from_millis(*ms as u64);
542                                    UNIX_EPOCH + duration
543                                }
544                                _ => {
545                                    return Err(RedisError::from((
546                                        redis::ErrorKind::TypeError,
547                                        "timestamp must be an integer",
548                                    )));
549                                }
550                            });
551                        }
552                        "stream_id" => {
553                            stream_id = Some(parse_value!(String, val, "stream_id")?);
554                        }
555                        "event_name" => {
556                            event_name = Some(parse_value!(String, val, "event_name")?);
557                        }
558                        "metadata" => {
559                            metadata = Some(match val {
560                                Value::BulkString(data) => data.clone(),
561                                Value::Nil => Vec::new(), // Handle null metadata
562                                _ => {
563                                    return Err(RedisError::from((
564                                        redis::ErrorKind::TypeError,
565                                        "metadata must be bulk data",
566                                    )));
567                                }
568                            });
569                        }
570                        "payload" => {
571                            payload = Some(match val {
572                                Value::BulkString(data) => data.clone(),
573                                Value::Nil => Vec::new(),
574                                _ => {
575                                    return Err(RedisError::from((
576                                        redis::ErrorKind::TypeError,
577                                        "payload must be bulk data",
578                                    )));
579                                }
580                            });
581                        }
582                        _ => {} // Ignore unknown fields
583                    }
584                }
585
586                // Ensure all required fields are present
587                let event_id = event_id.ok_or_else(|| {
588                    RedisError::from((
589                        redis::ErrorKind::TypeError,
590                        "Missing required field: event_id",
591                    ))
592                })?;
593                let partition_key = partition_key.ok_or_else(|| {
594                    RedisError::from((
595                        redis::ErrorKind::TypeError,
596                        "Missing required field: partition_key",
597                    ))
598                })?;
599                let partition_id = partition_id.ok_or_else(|| {
600                    RedisError::from((
601                        redis::ErrorKind::TypeError,
602                        "Missing required field: partition_id",
603                    ))
604                })?;
605                let transaction_id = transaction_id.ok_or_else(|| {
606                    RedisError::from((
607                        redis::ErrorKind::TypeError,
608                        "Missing required field: transaction_id",
609                    ))
610                })?;
611                let partition_sequence = partition_sequence.ok_or_else(|| {
612                    RedisError::from((
613                        redis::ErrorKind::TypeError,
614                        "Missing required field: partition_sequence",
615                    ))
616                })?;
617                let stream_version = stream_version.ok_or_else(|| {
618                    RedisError::from((
619                        redis::ErrorKind::TypeError,
620                        "Missing required field: stream_version",
621                    ))
622                })?;
623                let timestamp = timestamp.ok_or_else(|| {
624                    RedisError::from((
625                        redis::ErrorKind::TypeError,
626                        "Missing required field: timestamp",
627                    ))
628                })?;
629                let stream_id = stream_id.ok_or_else(|| {
630                    RedisError::from((
631                        redis::ErrorKind::TypeError,
632                        "Missing required field: stream_id",
633                    ))
634                })?;
635                let event_name = event_name.ok_or_else(|| {
636                    RedisError::from((
637                        redis::ErrorKind::TypeError,
638                        "Missing required field: event_name",
639                    ))
640                })?;
641                let metadata = metadata.ok_or_else(|| {
642                    RedisError::from((
643                        redis::ErrorKind::TypeError,
644                        "Missing required field: metadata",
645                    ))
646                })?;
647                let payload = payload.ok_or_else(|| {
648                    RedisError::from((
649                        redis::ErrorKind::TypeError,
650                        "Missing required field: payload",
651                    ))
652                })?;
653
654                Ok(Event {
655                    event_id,
656                    partition_key,
657                    partition_id,
658                    transaction_id,
659                    partition_sequence,
660                    stream_version,
661                    timestamp,
662                    stream_id,
663                    event_name,
664                    metadata,
665                    payload,
666                })
667            }
668            _ => Err(RedisError::from((
669                redis::ErrorKind::TypeError,
670                "Event must be a Redis map",
671            ))),
672        }
673    }
674}
675
676#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
677pub enum RangeValue {
678    Start,      // "-"
679    Value(u64), // specific number
680    End,        // "+"
681}
682
683impl From<u64> for RangeValue {
684    fn from(value: u64) -> Self {
685        RangeValue::Value(value)
686    }
687}
688
689impl ToRedisArgs for RangeValue {
690    fn write_redis_args<W>(&self, out: &mut W)
691    where
692        W: ?Sized + RedisWrite,
693    {
694        match self {
695            RangeValue::Start => out.write_arg(b"-"),
696            RangeValue::End => out.write_arg(b"+"),
697            RangeValue::Value(v) => out.write_arg(v.to_string().as_bytes()),
698        }
699    }
700}
701
702/// Information about a multi-event append transaction.
703#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
704pub struct MultiAppendInfo {
705    pub partition_key: Uuid,
706    pub partition_id: u16,
707    pub first_partition_sequence: u64,
708    pub last_partition_sequence: u64,
709    pub events: Vec<EventInfo>,
710}
711
712/// Information about a single event within a multi-append transaction.
713#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
714pub struct EventInfo {
715    pub event_id: Uuid,
716    pub partition_id: u16,
717    pub partition_sequence: u64,
718    pub stream_id: String,
719    pub stream_version: u64,
720    pub timestamp: SystemTime,
721}
722
723impl EventInfo {
724    fn from_redis_value_inner(value: &Value, is_multi: bool) -> RedisResult<Self> {
725        match value {
726            Value::Map(fields) => {
727                let mut event_id = None;
728                let mut partition_id = None;
729                let mut partition_sequence = None;
730                let mut stream_id = None;
731                let mut stream_version = None;
732                let mut timestamp = None;
733
734                // Extract fields from map
735                for (key, val) in fields {
736                    let field_name = match key {
737                        Value::SimpleString(s) => s.as_str(),
738                        _ => continue,
739                    };
740
741                    match field_name {
742                        "event_id" => {
743                            event_id = Some(parse_value!(Uuid, val, "event_id")?);
744                        }
745                        "partition_id" => {
746                            partition_id = Some(match val {
747                                Value::Int(n) => *n as u16,
748                                _ => {
749                                    return Err(RedisError::from((
750                                        redis::ErrorKind::TypeError,
751                                        "partition_id must be an integer",
752                                    )));
753                                }
754                            });
755                        }
756                        "partition_sequence" => {
757                            partition_sequence = Some(match val {
758                                Value::Int(n) => *n as u64,
759                                _ => {
760                                    return Err(RedisError::from((
761                                        redis::ErrorKind::TypeError,
762                                        "partition_sequence must be an integer",
763                                    )));
764                                }
765                            });
766                        }
767                        "stream_id" => {
768                            stream_id = Some(parse_value!(String, val, "stream_id")?);
769                        }
770                        "stream_version" => {
771                            stream_version = Some(match val {
772                                Value::Int(n) => *n as u64,
773                                _ => {
774                                    return Err(RedisError::from((
775                                        redis::ErrorKind::TypeError,
776                                        "stream_version must be an integer",
777                                    )));
778                                }
779                            });
780                        }
781                        "timestamp" => {
782                            timestamp = Some(match val {
783                                Value::Int(ms) => {
784                                    let duration = Duration::from_millis(*ms as u64);
785                                    UNIX_EPOCH + duration
786                                }
787                                _ => {
788                                    return Err(RedisError::from((
789                                        redis::ErrorKind::TypeError,
790                                        "timestamp must be an integer",
791                                    )));
792                                }
793                            });
794                        }
795                        _ => {} // Ignore unknown fields
796                    }
797                }
798
799                // Ensure all required fields are present
800                let event_id = event_id.ok_or_else(|| {
801                    RedisError::from((
802                        redis::ErrorKind::TypeError,
803                        "Missing required field: event_id",
804                    ))
805                })?;
806                let partition_id = partition_id.or(is_multi.then_some(0)).ok_or_else(|| {
807                    RedisError::from((
808                        redis::ErrorKind::TypeError,
809                        "Missing required field: partition_id",
810                    ))
811                })?;
812                let partition_sequence =
813                    partition_sequence
814                        .or(is_multi.then_some(0))
815                        .ok_or_else(|| {
816                            RedisError::from((
817                                redis::ErrorKind::TypeError,
818                                "Missing required field: partition_sequence",
819                            ))
820                        })?;
821                let stream_id = stream_id.ok_or_else(|| {
822                    RedisError::from((
823                        redis::ErrorKind::TypeError,
824                        "Missing required field: stream_id",
825                    ))
826                })?;
827                let stream_version = stream_version.ok_or_else(|| {
828                    RedisError::from((
829                        redis::ErrorKind::TypeError,
830                        "Missing required field: stream_version",
831                    ))
832                })?;
833                let timestamp = timestamp.ok_or_else(|| {
834                    RedisError::from((
835                        redis::ErrorKind::TypeError,
836                        "Missing required field: timestamp",
837                    ))
838                })?;
839
840                Ok(EventInfo {
841                    event_id,
842                    partition_id,
843                    partition_sequence,
844                    stream_id,
845                    stream_version,
846                    timestamp,
847                })
848            }
849            _ => Err(RedisError::from((
850                redis::ErrorKind::TypeError,
851                "Event info must be a Redis map",
852            ))),
853        }
854    }
855}
856
857impl FromRedisValue for EventInfo {
858    fn from_redis_value(value: &Value) -> RedisResult<Self> {
859        Self::from_redis_value_inner(value, false)
860    }
861}
862
863impl FromRedisValue for MultiAppendInfo {
864    fn from_redis_value(value: &Value) -> RedisResult<Self> {
865        match value {
866            Value::Map(fields) => {
867                let mut partition_key = None;
868                let mut partition_id = None;
869                let mut first_partition_sequence = None;
870                let mut last_partition_sequence = None;
871                let mut events = None;
872
873                // Extract fields from map
874                for (key, val) in fields {
875                    let field_name = match key {
876                        Value::SimpleString(s) => s.as_str(),
877                        _ => continue,
878                    };
879
880                    match field_name {
881                        "partition_key" => {
882                            partition_key = Some(parse_value!(Uuid, val, "partition_key")?);
883                        }
884                        "partition_id" => {
885                            partition_id = Some(match val {
886                                Value::Int(n) => *n as u16,
887                                _ => {
888                                    return Err(RedisError::from((
889                                        redis::ErrorKind::TypeError,
890                                        "partition_id must be an integer",
891                                    )));
892                                }
893                            });
894                        }
895                        "first_partition_sequence" => {
896                            first_partition_sequence = Some(match val {
897                                Value::Int(n) => *n as u64,
898                                _ => {
899                                    return Err(RedisError::from((
900                                        redis::ErrorKind::TypeError,
901                                        "first_partition_sequence must be an integer",
902                                    )));
903                                }
904                            });
905                        }
906                        "last_partition_sequence" => {
907                            last_partition_sequence = Some(match val {
908                                Value::Int(n) => *n as u64,
909                                _ => {
910                                    return Err(RedisError::from((
911                                        redis::ErrorKind::TypeError,
912                                        "last_partition_sequence must be an integer",
913                                    )));
914                                }
915                            });
916                        }
917                        "events" => {
918                            events = Some(match val {
919                                Value::Array(event_values) => event_values
920                                    .iter()
921                                    .map(|value| EventInfo::from_redis_value_inner(value, true))
922                                    .collect::<Result<Vec<_>, _>>()?,
923                                _ => {
924                                    return Err(RedisError::from((
925                                        redis::ErrorKind::TypeError,
926                                        "events must be an array",
927                                    )));
928                                }
929                            });
930                        }
931                        _ => {} // Ignore unknown fields
932                    }
933                }
934
935                // Ensure all required fields are present
936                let partition_key = partition_key.ok_or_else(|| {
937                    RedisError::from((
938                        redis::ErrorKind::TypeError,
939                        "Missing required field: partition_key",
940                    ))
941                })?;
942                let partition_id = partition_id.ok_or_else(|| {
943                    RedisError::from((
944                        redis::ErrorKind::TypeError,
945                        "Missing required field: partition_id",
946                    ))
947                })?;
948                let first_partition_sequence = first_partition_sequence.ok_or_else(|| {
949                    RedisError::from((
950                        redis::ErrorKind::TypeError,
951                        "Missing required field: first_partition_sequence",
952                    ))
953                })?;
954                let last_partition_sequence = last_partition_sequence.ok_or_else(|| {
955                    RedisError::from((
956                        redis::ErrorKind::TypeError,
957                        "Missing required field: last_partition_sequence",
958                    ))
959                })?;
960                let events = events
961                    .ok_or_else(|| {
962                        RedisError::from((
963                            redis::ErrorKind::TypeError,
964                            "Missing required field: events",
965                        ))
966                    })?
967                    .into_iter()
968                    .enumerate()
969                    .map(|(i, mut event)| {
970                        event.partition_id = partition_id;
971                        event.partition_sequence = first_partition_sequence + i as u64;
972                        event
973                    })
974                    .collect();
975
976                Ok(MultiAppendInfo {
977                    partition_key,
978                    partition_id,
979                    first_partition_sequence,
980                    last_partition_sequence,
981                    events,
982                })
983            }
984            _ => Err(RedisError::from((
985                redis::ErrorKind::TypeError,
986                "Multi append info must be a Redis map",
987            ))),
988        }
989    }
990}
991
992/// Information about an event subscription.
993#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
994pub struct SubscriptionInfo {
995    /// The type of subscription ("subscribe")
996    pub subscription_type: String,
997    /// The channel/topic name being subscribed to
998    pub channel: String,
999    /// Current number of active subscriptions on this connection
1000    pub active_subscriptions: i64,
1001}
1002
1003impl FromRedisValue for SubscriptionInfo {
1004    fn from_redis_value(value: &Value) -> RedisResult<Self> {
1005        match value {
1006            Value::Map(fields) => {
1007                let mut subscription_type = None;
1008                let mut channel = None;
1009                let mut active_subscriptions = None;
1010
1011                // Extract fields from map
1012                for (key, val) in fields {
1013                    let field_name = match key {
1014                        Value::SimpleString(s) => s.as_str(),
1015                        _ => continue,
1016                    };
1017
1018                    match field_name {
1019                        "subscription_type" => {
1020                            subscription_type =
1021                                Some(parse_value!(String, val, "subscription_type")?);
1022                        }
1023                        "channel" => {
1024                            channel = Some(parse_value!(String, val, "channel")?);
1025                        }
1026                        "active_subscriptions" => {
1027                            active_subscriptions = Some(match val {
1028                                Value::Int(n) => *n,
1029                                _ => {
1030                                    return Err(RedisError::from((
1031                                        redis::ErrorKind::TypeError,
1032                                        "active_subscriptions must be an integer",
1033                                    )));
1034                                }
1035                            });
1036                        }
1037                        _ => {} // Ignore unknown fields
1038                    }
1039                }
1040
1041                // Ensure all required fields are present
1042                let subscription_type = subscription_type.ok_or_else(|| {
1043                    RedisError::from((
1044                        redis::ErrorKind::TypeError,
1045                        "Missing required field: subscription_type",
1046                    ))
1047                })?;
1048                let channel = channel.ok_or_else(|| {
1049                    RedisError::from((
1050                        redis::ErrorKind::TypeError,
1051                        "Missing required field: channel",
1052                    ))
1053                })?;
1054                let active_subscriptions = active_subscriptions.ok_or_else(|| {
1055                    RedisError::from((
1056                        redis::ErrorKind::TypeError,
1057                        "Missing required field: active_subscriptions",
1058                    ))
1059                })?;
1060
1061                Ok(SubscriptionInfo {
1062                    subscription_type,
1063                    channel,
1064                    active_subscriptions,
1065                })
1066            }
1067            _ => Err(RedisError::from((
1068                redis::ErrorKind::TypeError,
1069                "Subscription info must be a Redis map",
1070            ))),
1071        }
1072    }
1073}