Skip to main content

kalamdb_commons/
websocket_messages.rs

1use std::collections::HashMap;
2
3use serde::{Deserialize, Serialize};
4
5use crate::{
6    ids::SeqId,
7    models::{KalamCellValue, Role, SchemaField, UserId},
8    websocket_protocol::ProtocolOptions,
9};
10
11/// Type of change that occurred in the database.
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
13#[serde(rename_all = "lowercase")]
14pub enum ChangeTypeRaw {
15    /// New row(s) inserted.
16    Insert,
17    /// Existing row(s) updated.
18    Update,
19    /// Row(s) deleted.
20    Delete,
21}
22
23/// Batch loading status for a live subscription.
24#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
25#[serde(rename_all = "snake_case")]
26pub enum BatchStatus {
27    /// Initial batch being loaded.
28    Loading,
29    /// Subsequent batches are being loaded.
30    LoadingBatch,
31    /// Initial loading is finished and live updates are active.
32    Ready,
33}
34
35/// Batch control metadata for paginated initial data loading.
36#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
37pub struct BatchControl {
38    /// Current batch number (0-indexed).
39    pub batch_num: u32,
40    /// Whether more batches are available to fetch.
41    pub has_more: bool,
42    /// Loading status for the subscription.
43    pub status: BatchStatus,
44    /// The SeqId of the last row in this batch (used for subsequent requests).
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub last_seq_id: Option<SeqId>,
47}
48
49impl BatchControl {
50    /// Create a batch control for the first batch (batch_num=0).
51    pub fn first(has_more: bool) -> Self {
52        Self {
53            batch_num: 0,
54            has_more,
55            status: if has_more {
56                BatchStatus::Loading
57            } else {
58                BatchStatus::Ready
59            },
60            last_seq_id: None,
61        }
62    }
63
64    /// Create a batch control for a subsequent batch (batch_num > 0).
65    pub fn subsequent(batch_num: u32, has_more: bool) -> Self {
66        Self {
67            batch_num,
68            has_more,
69            status: if has_more {
70                BatchStatus::LoadingBatch
71            } else {
72                BatchStatus::Ready
73            },
74            last_seq_id: None,
75        }
76    }
77
78    /// Create batch control with all fields specified.
79    pub fn new(batch_num: u32, has_more: bool, last_seq_id: Option<SeqId>) -> Self {
80        let status = if batch_num == 0 {
81            if has_more {
82                BatchStatus::Loading
83            } else {
84                BatchStatus::Ready
85            }
86        } else if has_more {
87            BatchStatus::LoadingBatch
88        } else {
89            BatchStatus::Ready
90        };
91
92        Self {
93            batch_num,
94            has_more,
95            status,
96            last_seq_id,
97        }
98    }
99}
100
101/// Options for live query subscriptions.
102#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
103pub struct SubscriptionOptions {
104    /// Hint for server-side batch sizing during initial data load.
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub batch_size: Option<usize>,
107    /// Number of last (newest) rows to fetch for initial data.
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub last_rows: Option<u32>,
110    /// Resume subscription from a specific sequence ID.
111    #[serde(skip_serializing_if = "Option::is_none", alias = "from_seq_id")]
112    pub from: Option<SeqId>,
113    /// Client-side control for automatically requesting subsequent initial data batches.
114    #[serde(default, skip_serializing, alias = "autoFetchBatches")]
115    pub auto_fetch_batches: Option<bool>,
116}
117
118impl SubscriptionOptions {
119    /// Create new subscription options with defaults.
120    pub fn new() -> Self {
121        Self::default()
122    }
123
124    /// Set the batch size for initial data loading.
125    pub fn with_batch_size(mut self, size: usize) -> Self {
126        self.batch_size = Some(size);
127        self
128    }
129
130    /// Set the number of last rows to fetch.
131    pub fn with_last_rows(mut self, count: u32) -> Self {
132        self.last_rows = Some(count);
133        self
134    }
135
136    /// Resume from a specific sequence ID.
137    pub fn with_from(mut self, seq_id: SeqId) -> Self {
138        self.from = Some(seq_id);
139        self
140    }
141
142    /// Resume from a specific sequence ID via deprecated alias.
143    pub fn with_from_seq_id(self, seq_id: SeqId) -> Self {
144        self.with_from(seq_id)
145    }
146
147    /// Control whether the client should automatically request subsequent initial data batches.
148    pub fn with_auto_fetch_batches(mut self, enabled: bool) -> Self {
149        self.auto_fetch_batches = Some(enabled);
150        self
151    }
152
153    /// Check if this contains a resume seq_id.
154    pub fn has_resume_seq_id(&self) -> bool {
155        self.from.is_some()
156    }
157}
158
159/// Subscription request details.
160#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct SubscriptionRequest {
162    /// Unique subscription identifier (client-generated).
163    pub id: String,
164    /// SQL query for live updates (must be a SELECT statement).
165    pub sql: String,
166    /// Optional subscription options.
167    #[serde(default)]
168    pub options: Option<SubscriptionOptions>,
169}
170
171/// WebSocket message types sent from server to client for the SDK transport.
172#[derive(Debug, Clone, Serialize, Deserialize)]
173#[serde(tag = "type", rename_all = "snake_case")]
174pub enum ServerMessage {
175    /// Authentication successful response (browser clients only).
176    AuthSuccess {
177        user: UserId,
178        role: Role,
179        protocol: ProtocolOptions,
180    },
181    /// Authentication failed response (browser clients only).
182    AuthError { message: String },
183    /// Acknowledgement of successful subscription registration.
184    SubscriptionAck {
185        subscription_id: String,
186        total_rows: u32,
187        batch_control: BatchControl,
188        schema: Vec<SchemaField>,
189    },
190    /// Initial data batch sent after subscription or on client request.
191    InitialDataBatch {
192        subscription_id: String,
193        rows: Vec<HashMap<String, KalamCellValue>>,
194        batch_control: BatchControl,
195    },
196    /// Change notification for INSERT/UPDATE/DELETE operations.
197    Change {
198        subscription_id: String,
199        change_type: ChangeTypeRaw,
200        #[serde(skip_serializing_if = "Option::is_none")]
201        rows: Option<Vec<HashMap<String, KalamCellValue>>>,
202        #[serde(skip_serializing_if = "Option::is_none")]
203        old_values: Option<Vec<HashMap<String, KalamCellValue>>>,
204    },
205    /// Error notification.
206    Error {
207        subscription_id: String,
208        code: String,
209        message: String,
210    },
211}
212
213/// Client-to-server live query request messages.
214#[cfg(feature = "websocket-auth")]
215#[derive(Debug, Clone, Serialize, Deserialize)]
216#[serde(tag = "type", rename_all = "snake_case")]
217pub enum ClientMessage {
218    /// Authenticate WebSocket connection.
219    Authenticate {
220        #[serde(flatten)]
221        credentials: crate::websocket_auth::WsAuthCredentials,
222        protocol: ProtocolOptions,
223    },
224    /// Subscribe to live query updates.
225    Subscribe { subscription: SubscriptionRequest },
226    /// Request next batch of initial data.
227    NextBatch {
228        subscription_id: String,
229        #[serde(skip_serializing_if = "Option::is_none")]
230        last_seq_id: Option<SeqId>,
231    },
232    /// Unsubscribe from live query.
233    Unsubscribe { subscription_id: String },
234    /// Application-level keepalive ping.
235    Ping,
236}
237
238#[cfg(feature = "websocket-auth")]
239impl ClientMessage {
240    /// Create a subscribe message for a single subscription.
241    pub fn subscribe(subscription: SubscriptionRequest) -> Self {
242        Self::Subscribe { subscription }
243    }
244
245    /// Create a next batch request message.
246    pub fn next_batch(subscription_id: String, last_seq_id: Option<SeqId>) -> Self {
247        Self::NextBatch {
248            subscription_id,
249            last_seq_id,
250        }
251    }
252
253    /// Create an unsubscribe message.
254    pub fn unsubscribe(subscription_id: String) -> Self {
255        Self::Unsubscribe { subscription_id }
256    }
257}