1use std::collections::HashMap;
2
3use serde::{Deserialize, Serialize};
4
5use crate::{
6 ids::SeqId,
7 models::{KalamCellValue, Role, SchemaField, UserId},
8 websocket_protocol::ProtocolOptions,
9};
10
11#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
13#[serde(rename_all = "lowercase")]
14pub enum ChangeTypeRaw {
15 Insert,
17 Update,
19 Delete,
21}
22
23#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
25#[serde(rename_all = "snake_case")]
26pub enum BatchStatus {
27 Loading,
29 LoadingBatch,
31 Ready,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
37pub struct BatchControl {
38 pub batch_num: u32,
40 pub has_more: bool,
42 pub status: BatchStatus,
44 #[serde(skip_serializing_if = "Option::is_none")]
46 pub last_seq_id: Option<SeqId>,
47}
48
49impl BatchControl {
50 pub fn first(has_more: bool) -> Self {
52 Self {
53 batch_num: 0,
54 has_more,
55 status: if has_more {
56 BatchStatus::Loading
57 } else {
58 BatchStatus::Ready
59 },
60 last_seq_id: None,
61 }
62 }
63
64 pub fn subsequent(batch_num: u32, has_more: bool) -> Self {
66 Self {
67 batch_num,
68 has_more,
69 status: if has_more {
70 BatchStatus::LoadingBatch
71 } else {
72 BatchStatus::Ready
73 },
74 last_seq_id: None,
75 }
76 }
77
78 pub fn new(batch_num: u32, has_more: bool, last_seq_id: Option<SeqId>) -> Self {
80 let status = if batch_num == 0 {
81 if has_more {
82 BatchStatus::Loading
83 } else {
84 BatchStatus::Ready
85 }
86 } else if has_more {
87 BatchStatus::LoadingBatch
88 } else {
89 BatchStatus::Ready
90 };
91
92 Self {
93 batch_num,
94 has_more,
95 status,
96 last_seq_id,
97 }
98 }
99}
100
101#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
103pub struct SubscriptionOptions {
104 #[serde(skip_serializing_if = "Option::is_none")]
106 pub batch_size: Option<usize>,
107 #[serde(skip_serializing_if = "Option::is_none")]
109 pub last_rows: Option<u32>,
110 #[serde(skip_serializing_if = "Option::is_none", alias = "from_seq_id")]
112 pub from: Option<SeqId>,
113 #[serde(default, skip_serializing, alias = "autoFetchBatches")]
115 pub auto_fetch_batches: Option<bool>,
116}
117
118impl SubscriptionOptions {
119 pub fn new() -> Self {
121 Self::default()
122 }
123
124 pub fn with_batch_size(mut self, size: usize) -> Self {
126 self.batch_size = Some(size);
127 self
128 }
129
130 pub fn with_last_rows(mut self, count: u32) -> Self {
132 self.last_rows = Some(count);
133 self
134 }
135
136 pub fn with_from(mut self, seq_id: SeqId) -> Self {
138 self.from = Some(seq_id);
139 self
140 }
141
142 pub fn with_from_seq_id(self, seq_id: SeqId) -> Self {
144 self.with_from(seq_id)
145 }
146
147 pub fn with_auto_fetch_batches(mut self, enabled: bool) -> Self {
149 self.auto_fetch_batches = Some(enabled);
150 self
151 }
152
153 pub fn has_resume_seq_id(&self) -> bool {
155 self.from.is_some()
156 }
157}
158
159#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct SubscriptionRequest {
162 pub id: String,
164 pub sql: String,
166 #[serde(default)]
168 pub options: Option<SubscriptionOptions>,
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
173#[serde(tag = "type", rename_all = "snake_case")]
174pub enum ServerMessage {
175 AuthSuccess {
177 user: UserId,
178 role: Role,
179 protocol: ProtocolOptions,
180 },
181 AuthError { message: String },
183 SubscriptionAck {
185 subscription_id: String,
186 total_rows: u32,
187 batch_control: BatchControl,
188 schema: Vec<SchemaField>,
189 },
190 InitialDataBatch {
192 subscription_id: String,
193 rows: Vec<HashMap<String, KalamCellValue>>,
194 batch_control: BatchControl,
195 },
196 Change {
198 subscription_id: String,
199 change_type: ChangeTypeRaw,
200 #[serde(skip_serializing_if = "Option::is_none")]
201 rows: Option<Vec<HashMap<String, KalamCellValue>>>,
202 #[serde(skip_serializing_if = "Option::is_none")]
203 old_values: Option<Vec<HashMap<String, KalamCellValue>>>,
204 },
205 Error {
207 subscription_id: String,
208 code: String,
209 message: String,
210 },
211}
212
213#[cfg(feature = "websocket-auth")]
215#[derive(Debug, Clone, Serialize, Deserialize)]
216#[serde(tag = "type", rename_all = "snake_case")]
217pub enum ClientMessage {
218 Authenticate {
220 #[serde(flatten)]
221 credentials: crate::websocket_auth::WsAuthCredentials,
222 protocol: ProtocolOptions,
223 },
224 Subscribe { subscription: SubscriptionRequest },
226 NextBatch {
228 subscription_id: String,
229 #[serde(skip_serializing_if = "Option::is_none")]
230 last_seq_id: Option<SeqId>,
231 },
232 Unsubscribe { subscription_id: String },
234 Ping,
236}
237
238#[cfg(feature = "websocket-auth")]
239impl ClientMessage {
240 pub fn subscribe(subscription: SubscriptionRequest) -> Self {
242 Self::Subscribe { subscription }
243 }
244
245 pub fn next_batch(subscription_id: String, last_seq_id: Option<SeqId>) -> Self {
247 Self::NextBatch {
248 subscription_id,
249 last_seq_id,
250 }
251 }
252
253 pub fn unsubscribe(subscription_id: String) -> Self {
255 Self::Unsubscribe { subscription_id }
256 }
257}