Skip to main content

kcl_async/
message.rs

1pub mod output {
2    use serde::{Deserialize, Serialize};
3
4    #[derive(Debug, Serialize, Deserialize)]
5    #[serde(tag = "action")]
6    pub enum Message {
7        #[serde(rename = "checkpoint")]
8        Checkpoint(CheckpointMessage),
9
10        #[serde(rename = "status")]
11        Status(StatusMessage),
12    }
13
14    #[derive(Debug, Serialize, Deserialize)]
15    pub struct CheckpointMessage {
16        #[serde(rename = "sequenceNumber")]
17        pub sequence_number: Option<String>,
18
19        #[serde(
20            rename = "subSequenceNumber",
21            skip_serializing_if = "Option::is_none",
22            default
23        )]
24        pub sub_sequence_number: Option<u64>,
25    }
26
27    #[derive(Debug, Serialize, Deserialize)]
28    pub struct StatusMessage {
29        #[serde(rename = "responseFor")]
30        pub response_for: String,
31    }
32
33    impl StatusMessage {
34        pub fn from_message(message: &super::input::Message) -> Self {
35            Self {
36                response_for: message.id().into(),
37            }
38        }
39    }
40}
41
42pub mod input {
43    use base64::prelude::*;
44    use serde::{Deserialize, Serialize};
45
46    #[derive(Debug, Serialize, Deserialize)]
47    #[serde(tag = "action")]
48    pub enum Message {
49        #[serde(rename = "checkpoint")]
50        Checkpoint(CheckpointMessage),
51
52        #[serde(rename = "initialize")]
53        Initialize(InitializeMessage),
54
55        #[serde(rename = "processRecords")]
56        ProcessRecords(ProcessRecordsMessage),
57
58        #[serde(rename = "shutdown")]
59        Shutdown(ShutdownMessage),
60
61        #[serde(rename = "shutdownRequested")]
62        ShutdownRequested(ShutdownRequestedMessage),
63
64        #[serde(rename = "leaseLost")]
65        LeaseLost(LeaseLostMessage),
66
67        #[serde(rename = "shardEnded")]
68        ShardEnded(ShardEndedMessage),
69    }
70
71    impl Message {
72        pub const fn id(&self) -> &'static str {
73            match self {
74                Message::Checkpoint(_) => "checkpoint",
75                Message::Initialize(_) => "initialize",
76                Message::ProcessRecords(_) => "processRecords",
77                Message::Shutdown(_) => "shutdown",
78                Message::ShutdownRequested(_) => "shutdownRequested",
79                Message::LeaseLost(_) => "leaseLost",
80                Message::ShardEnded(_) => "shardEnded",
81            }
82        }
83    }
84
85    #[derive(Debug, Serialize, Deserialize)]
86    pub struct CheckpointMessage {
87        #[serde(rename = "sequenceNumber")]
88        pub sequence_number: Option<String>,
89
90        #[serde(
91            rename = "subSequenceNumber",
92            skip_serializing_if = "Option::is_none",
93            default
94        )]
95        pub sub_sequence_number: Option<u64>,
96
97        #[serde(rename = "error", skip_serializing_if = "Option::is_none", default)]
98        pub error: Option<String>,
99    }
100
101    #[derive(Debug, Serialize, Deserialize)]
102    pub struct InitializeMessage {
103        #[serde(rename = "shardId")]
104        pub shard_id: String,
105
106        #[serde(
107            rename = "sequenceNumber",
108            skip_serializing_if = "Option::is_none",
109            default
110        )]
111        pub sequence_number: Option<String>,
112
113        #[serde(
114            rename = "subSequenceNumber",
115            skip_serializing_if = "Option::is_none",
116            default
117        )]
118        pub sub_sequence_number: Option<u64>,
119    }
120
121    #[derive(Debug, Serialize, Deserialize)]
122    pub struct Record {
123        #[serde(rename = "data")]
124        pub base64_data: String,
125
126        #[serde(rename = "partitionKey")]
127        pub partition_key: String,
128
129        #[serde(rename = "sequenceNumber")]
130        pub sequence_number: String,
131
132        #[serde(
133            rename = "subSequenceNumber",
134            skip_serializing_if = "Option::is_none",
135            default
136        )]
137        pub sub_sequence_number: Option<u64>,
138
139        #[serde(
140            rename = "approximateArrivalTimestamp",
141            skip_serializing_if = "Option::is_none",
142            default
143        )]
144        pub approximate_arrival_timestamp_ms: Option<u64>,
145    }
146
147    impl Record {
148        pub fn to_bytes(&self) -> Result<Vec<u8>, base64::DecodeError> {
149            BASE64_STANDARD.decode(&self.base64_data)
150        }
151    }
152
153    #[derive(Debug, Serialize, Deserialize)]
154    pub struct LeaseLostMessage {}
155
156    #[derive(Debug, Serialize, Deserialize)]
157    pub struct ProcessRecordsMessage {
158        #[serde(rename = "records")]
159        pub records: Vec<Record>,
160
161        #[serde(
162            rename = "millisBehindLatest",
163            skip_serializing_if = "Option::is_none",
164            default
165        )]
166        pub millis_behind_latest: Option<u64>,
167    }
168
169    #[derive(Debug, Serialize, Deserialize)]
170    pub struct ShardEndedMessage {}
171
172    #[derive(Debug, Serialize, Deserialize)]
173    pub struct ShutdownMessage {
174        #[serde(rename = "reason", skip_serializing_if = "Option::is_none", default)]
175        pub reason: Option<String>,
176    }
177
178    #[derive(Debug, Serialize, Deserialize)]
179    pub struct ShutdownRequestedMessage {}
180}