Skip to main content

ursula_stream/
snapshot.rs

1use serde::{Deserialize, Serialize};
2use ursula_shard::BucketStreamId;
3
4use crate::model::{
5    ColdChunkRef, HotPayloadSegment, ObjectPayloadRef, ProducerSnapshot, StreamMessageRecord,
6    StreamMetadata, StreamVisibleSnapshot,
7};
8
9#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
10pub struct StreamSnapshot {
11    pub buckets: Vec<String>,
12    pub streams: Vec<StreamSnapshotEntry>,
13}
14
15#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
16pub struct StreamSnapshotEntry {
17    pub metadata: StreamMetadata,
18    pub hot_start_offset: u64,
19    pub payload: Vec<u8>,
20    pub hot_segments: Vec<HotPayloadSegment>,
21    pub cold_chunks: Vec<ColdChunkRef>,
22    pub external_segments: Vec<ObjectPayloadRef>,
23    pub message_records: Vec<StreamMessageRecord>,
24    pub visible_snapshot: Option<StreamVisibleSnapshot>,
25    pub producer_states: Vec<ProducerSnapshot>,
26}
27
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub enum StreamSnapshotError {
30    DuplicateBucket(String),
31    DuplicateStream(BucketStreamId),
32    DuplicateProducer {
33        stream_id: BucketStreamId,
34        producer_id: String,
35    },
36    MissingBucket(BucketStreamId),
37    PayloadLengthMismatch {
38        stream_id: BucketStreamId,
39        tail_offset: u64,
40        payload_len: usize,
41    },
42    MessageBoundaryMismatch {
43        stream_id: BucketStreamId,
44    },
45    SnapshotOffsetOutOfRange {
46        stream_id: BucketStreamId,
47        snapshot_offset: u64,
48        tail_offset: u64,
49    },
50}
51
52impl std::fmt::Display for StreamSnapshotError {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        match self {
55            Self::DuplicateBucket(bucket_id) => {
56                write!(f, "snapshot contains duplicate bucket '{bucket_id}'")
57            }
58            Self::DuplicateStream(stream_id) => {
59                write!(f, "snapshot contains duplicate stream '{stream_id}'")
60            }
61            Self::DuplicateProducer {
62                stream_id,
63                producer_id,
64            } => write!(
65                f,
66                "snapshot stream '{stream_id}' contains duplicate producer '{producer_id}'"
67            ),
68            Self::MissingBucket(stream_id) => {
69                write!(
70                    f,
71                    "snapshot stream '{stream_id}' references a missing bucket"
72                )
73            }
74            Self::PayloadLengthMismatch {
75                stream_id,
76                tail_offset,
77                payload_len,
78            } => write!(
79                f,
80                "snapshot stream '{stream_id}' tail offset {tail_offset} does not match payload length {payload_len}"
81            ),
82            Self::MessageBoundaryMismatch { stream_id } => write!(
83                f,
84                "snapshot stream '{stream_id}' has inconsistent message boundaries"
85            ),
86            Self::SnapshotOffsetOutOfRange {
87                stream_id,
88                snapshot_offset,
89                tail_offset,
90            } => write!(
91                f,
92                "snapshot stream '{stream_id}' visible snapshot offset {snapshot_offset} is beyond tail offset {tail_offset}"
93            ),
94        }
95    }
96}
97
98impl std::error::Error for StreamSnapshotError {}