use std::{
cell::{Cell, RefCell},
collections::HashMap,
rc::Rc,
sync::atomic::{AtomicU64, Ordering},
};
static SUBSCRIPTION_COUNTER: AtomicU64 = AtomicU64::new(0);
use serde::Serialize;
use wasm_bindgen::{prelude::*, JsCast};
use wasm_bindgen_futures::JsFuture;
use web_sys::{CloseEvent, ErrorEvent, MessageEvent, WebSocket};
use super::{
auth::{resolve_auth_provider, WasmAuthProvider},
helpers::{
create_promise, decode_ws_binary_payload, decode_ws_message, send_ws_message,
serialize_json_to_js_value, subscription_hash,
},
reconnect::{self, reconnect_internal_with_auth, resubscribe_all},
state::{
callback_payload, filter_subscription_event, track_subscription_checkpoint,
SubscriptionCallbackMode, SubscriptionState, WasmLiveRowsOptions,
},
validation::{
quote_table_name, validate_column_name, validate_row_id, validate_sql_identifier,
},
wasm_debug_log,
};
use crate::{
models::{
ChangeEvent, ClientMessage, ConnectionOptions, SerializationType, ServerMessage,
SubscriptionOptions, SubscriptionRequest,
},
SeqId,
};
#[derive(Serialize)]
struct BorrowedQueryRequest<'a> {
sql: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
params: Option<&'a [serde_json::Value]>,
#[serde(default, skip_serializing_if = "Option::is_none")]
namespace_id: Option<&'a str>,
}
#[wasm_bindgen]
pub struct KalamClient {
url: String,
auth: Rc<RefCell<WasmAuthProvider>>,
ws: Rc<RefCell<Option<WebSocket>>>,
subscription_state: Rc<RefCell<HashMap<String, SubscriptionState>>>,
connection_options: Rc<RefCell<ConnectionOptions>>,
reconnect_attempts: Rc<RefCell<u32>>,
is_reconnecting: Rc<RefCell<bool>>,
ping_interval_id: Rc<RefCell<i32>>,
on_connect_cb: Rc<RefCell<Option<js_sys::Function>>>,
on_disconnect_cb: Rc<RefCell<Option<js_sys::Function>>>,
on_error_cb: Rc<RefCell<Option<js_sys::Function>>>,
on_receive_cb: Rc<RefCell<Option<js_sys::Function>>>,
on_send_cb: Rc<RefCell<Option<js_sys::Function>>>,
auth_provider_cb: Rc<RefCell<Option<js_sys::Function>>>,
default_namespace: Rc<RefCell<Option<String>>>,
negotiated_ser: Rc<Cell<SerializationType>>,
}
impl KalamClient {
async fn register_subscription(
&self,
sql: String,
subscription_options: SubscriptionOptions,
callback: js_sys::Function,
callback_mode: SubscriptionCallbackMode,
) -> Result<String, JsValue> {
if !self.is_connected() {
return Err(JsValue::from_str("Not connected to server. Call connect() first."));
}
let subscription_id = format!(
"sub-{:x}-{}",
subscription_hash(&sql),
SUBSCRIPTION_COUNTER.fetch_add(1, Ordering::Relaxed),
);
let (subscribe_promise, subscribe_resolve, subscribe_reject) = create_promise();
self.subscription_state.borrow_mut().insert(
subscription_id.clone(),
SubscriptionState {
sql: sql.clone(),
options: subscription_options.clone(),
callback,
last_seq_id: None,
pending_subscribe_resolve: Some(subscribe_resolve),
pending_subscribe_reject: Some(subscribe_reject),
awaiting_initial_response: true,
callback_mode,
},
);
if let Some(ws) = self.ws.borrow().as_ref() {
let subscribe_msg = ClientMessage::Subscribe {
subscription: SubscriptionRequest {
id: subscription_id.clone(),
sql: sql.clone(),
options: Some(subscription_options),
},
};
wasm_debug_log!(&format!(
"KalamClient: Sending subscribe request - id: {}, sql: {}",
subscription_id, sql
));
if let Err(error) = send_ws_message_traced(
ws,
&subscribe_msg,
self.negotiated_ser.get(),
&self.on_send_cb,
) {
self.subscription_state.borrow_mut().remove(&subscription_id);
return Err(error);
}
} else {
self.subscription_state.borrow_mut().remove(&subscription_id);
return Err(JsValue::from_str(
"WebSocket connection is unavailable for subscription registration",
));
}
JsFuture::from(subscribe_promise).await?;
wasm_debug_log!(&format!("KalamClient: Subscribed with ID: {}", subscription_id));
Ok(subscription_id)
}
fn new_with_auth(url: String, auth: WasmAuthProvider) -> KalamClient {
KalamClient {
url,
auth: Rc::new(RefCell::new(auth)),
ws: Rc::new(RefCell::new(None)),
subscription_state: Rc::new(RefCell::new(HashMap::new())),
connection_options: Rc::new(RefCell::new(ConnectionOptions::default())),
reconnect_attempts: Rc::new(RefCell::new(0)),
is_reconnecting: Rc::new(RefCell::new(false)),
ping_interval_id: Rc::new(RefCell::new(-1)),
on_connect_cb: Rc::new(RefCell::new(None)),
on_disconnect_cb: Rc::new(RefCell::new(None)),
on_error_cb: Rc::new(RefCell::new(None)),
on_receive_cb: Rc::new(RefCell::new(None)),
on_send_cb: Rc::new(RefCell::new(None)),
auth_provider_cb: Rc::new(RefCell::new(None)),
default_namespace: Rc::new(RefCell::new(None)),
negotiated_ser: Rc::new(Cell::new(SerializationType::Json)),
}
}
}
fn emit_ws_send(on_send_cb: &Rc<RefCell<Option<js_sys::Function>>>, msg: &ClientMessage) {
let Some(cb) = on_send_cb.borrow().as_ref().cloned() else {
return;
};
if let Ok(json) = serde_json::to_string(msg) {
let _ = cb.call1(&JsValue::NULL, &JsValue::from_str(&json));
}
}
fn send_ws_message_traced(
ws: &WebSocket,
msg: &ClientMessage,
serialization: SerializationType,
on_send_cb: &Rc<RefCell<Option<js_sys::Function>>>,
) -> Result<(), JsValue> {
send_ws_message(ws, msg, serialization)?;
emit_ws_send(on_send_cb, msg);
Ok(())
}
fn next_batch_message(subscription_id: &str, last_seq_id: Option<SeqId>) -> ClientMessage {
ClientMessage::NextBatch {
subscription_id: subscription_id.to_string(),
last_seq_id,
}
}
fn send_next_batch_traced(
ws: &WebSocket,
subscription_id: &str,
last_seq_id: Option<SeqId>,
serialization: SerializationType,
on_send_cb: &Rc<RefCell<Option<js_sys::Function>>>,
) -> Result<(), JsValue> {
let msg = next_batch_message(subscription_id, last_seq_id);
send_ws_message_traced(ws, &msg, serialization, on_send_cb)
}
fn normalize_default_namespace(namespace: Option<String>) -> Option<String> {
namespace.and_then(|value| {
let trimmed = value.trim();
(!trimmed.is_empty()).then(|| trimmed.to_string())
})
}
fn quote_identifier(identifier: &str) -> String {
format!("\"{}\"", identifier.replace('"', "\"\""))
}
fn find_subscription_from_clause(sql: &str) -> Option<usize> {
let chars: Vec<(usize, char)> = sql.char_indices().collect();
let mut in_single_quotes = false;
let mut in_double_quotes = false;
let mut i = 0;
while i < chars.len() {
let (index, ch) = chars[i];
match ch {
'\'' if !in_double_quotes => in_single_quotes = !in_single_quotes,
'"' if !in_single_quotes => in_double_quotes = !in_double_quotes,
_ if !in_single_quotes && !in_double_quotes => {
let candidate = &sql[index..];
if candidate.len() >= 4
&& candidate[..4].eq_ignore_ascii_case("from")
&& candidate[4..].chars().next().map_or(true, char::is_whitespace)
{
return Some(index + 4);
}
},
_ => {},
}
i += 1;
}
None
}
fn find_subscription_relation_end(sql: &str, relation_start: usize) -> usize {
let mut in_single_quotes = false;
let mut in_double_quotes = false;
for (index, ch) in sql[relation_start..].char_indices() {
match ch {
'\'' if !in_double_quotes => in_single_quotes = !in_single_quotes,
'"' if !in_single_quotes => in_double_quotes = !in_double_quotes,
_ if !in_single_quotes && !in_double_quotes && ch.is_whitespace() => {
return relation_start + index;
},
_ => {},
}
}
sql.len()
}
fn split_identifier_parts(identifier: &str) -> Result<Vec<String>, ()> {
let mut parts = Vec::new();
let mut current = String::new();
let mut in_double_quotes = false;
for ch in identifier.chars() {
match ch {
'"' => {
in_double_quotes = !in_double_quotes;
current.push(ch);
},
'.' if !in_double_quotes => {
if current.trim().is_empty() {
return Err(());
}
parts.push(current.trim().to_string());
current.clear();
},
_ => current.push(ch),
}
}
if in_double_quotes || current.trim().is_empty() {
return Err(());
}
parts.push(current.trim().to_string());
Ok(parts)
}
fn qualify_subscription_sql(sql: &str, default_namespace: Option<&str>) -> String {
let Some(default_namespace) = default_namespace else {
return sql.trim().trim_end_matches(';').trim().to_string();
};
let trimmed = sql.trim().trim_end_matches(';').trim();
if trimmed.is_empty() {
return String::new();
}
let Some(from_idx) = find_subscription_from_clause(trimmed) else {
return trimmed.to_string();
};
let relation_start = trimmed[from_idx..]
.char_indices()
.find_map(|(offset, ch)| (!ch.is_whitespace()).then_some(from_idx + offset));
let Some(relation_start) = relation_start else {
return trimmed.to_string();
};
let relation_end = find_subscription_relation_end(trimmed, relation_start);
let relation = trimmed[relation_start..relation_end].trim();
let Ok(parts) = split_identifier_parts(relation) else {
return trimmed.to_string();
};
if parts.len() != 1 {
return trimmed.to_string();
}
let qualified_relation = format!("{}.{}", quote_identifier(default_namespace), relation);
format!(
"{}{}{}",
&trimmed[..relation_start],
qualified_relation,
&trimmed[relation_end..]
)
}
fn reject_pending_subscriptions(
subscriptions: &Rc<RefCell<HashMap<String, SubscriptionState>>>,
message: &str,
) {
let mut pending_rejects = Vec::new();
{
let mut subs = subscriptions.borrow_mut();
let pending_ids: Vec<String> = subs
.iter()
.filter_map(|(id, state)| {
if state.awaiting_initial_response {
Some(id.clone())
} else {
None
}
})
.collect();
for id in pending_ids {
if let Some(mut state) = subs.remove(&id) {
state.awaiting_initial_response = false;
state.pending_subscribe_resolve = None;
if let Some(reject) = state.pending_subscribe_reject.take() {
pending_rejects.push(reject);
}
}
}
}
for reject in pending_rejects {
let _ = reject.call1(&JsValue::NULL, &JsValue::from_str(message));
}
}
struct SubscriptionDispatch {
callback: Option<js_sys::Function>,
payload: Option<String>,
resolve_subscribe: Option<js_sys::Function>,
reject_subscribe: Option<(js_sys::Function, String)>,
next_batch: Option<(String, Option<SeqId>)>,
}
impl SubscriptionDispatch {
fn invoke(self) -> Option<(String, Option<SeqId>)> {
if let Some(cb) = self.callback {
if let Some(payload) = self.payload {
let _ = cb.call1(&JsValue::NULL, &JsValue::from_str(&payload));
}
}
if let Some(resolve) = self.resolve_subscribe {
let _ = resolve.call0(&JsValue::NULL);
}
if let Some((reject, reason)) = self.reject_subscribe {
let _ = reject.call1(&JsValue::NULL, &JsValue::from_str(&reason));
}
self.next_batch
}
}
fn subscription_id_from_server_message(event: &ServerMessage) -> Option<&str> {
match event {
ServerMessage::SubscriptionAck {
subscription_id,
total_rows: _total_rows,
..
} => {
wasm_debug_log!(&format!(
"KalamClient: Parsed SubscriptionAck - id: {}, total_rows: {}",
subscription_id, _total_rows
));
Some(subscription_id.as_str())
},
ServerMessage::InitialDataBatch {
subscription_id,
batch_control: _batch_control,
rows: _rows,
} => {
wasm_debug_log!(&format!(
"KalamClient: Parsed InitialDataBatch - id: {}, rows: {}, status: {:?}",
subscription_id,
_rows.len(),
_batch_control.status
));
Some(subscription_id.as_str())
},
ServerMessage::Change {
subscription_id,
change_type: _change_type,
rows: _rows,
old_values: _,
} => {
wasm_debug_log!(&format!(
"KalamClient: Parsed Change - id: {}, type: {:?}, rows: {:?}",
subscription_id,
_change_type,
_rows.as_ref().map(|value| value.len())
));
Some(subscription_id.as_str())
},
ServerMessage::Error {
subscription_id,
code: _code,
message: _message,
..
} => {
wasm_debug_log!(&format!(
"KalamClient: Parsed Error - id: {}, code: {}, msg: {}",
subscription_id, _code, _message
));
Some(subscription_id.as_str())
},
_ => None,
}
}
fn resolve_subscription_key<'a>(
subscription_id: &'a str,
subscriptions: &HashMap<String, SubscriptionState>,
) -> Option<&'a str> {
if subscriptions.contains_key(subscription_id) {
Some(subscription_id)
} else {
None
}
}
fn dispatch_subscription_server_message(
subscriptions: &Rc<RefCell<HashMap<String, SubscriptionState>>>,
event: &ServerMessage,
) -> Option<SubscriptionDispatch> {
let subscription_id = subscription_id_from_server_message(event)?;
let matched_key = {
let subs = subscriptions.borrow();
wasm_debug_log!(&format!(
"KalamClient: Looking for callback for subscription_id: {} (registered subs: {})",
subscription_id,
subs.len()
));
resolve_subscription_key(&subscription_id, &subs)
};
let Some(client_id) = matched_key else {
wasm_debug_log!(&format!(
"KalamClient: No callback found for subscription_id: {}",
subscription_id
));
return None;
};
let mut callback = None;
let mut payload = None;
let mut resolve_subscribe = None;
let mut reject_subscribe = None;
let mut next_batch = None;
let mut remove_state = false;
{
let mut subs = subscriptions.borrow_mut();
if let Some(state) = subs.get_mut(client_id) {
callback = Some(state.callback.clone());
if let Some(filtered_event) = filter_subscription_event(&state.options, event) {
track_subscription_checkpoint(&mut state.last_seq_id, &filtered_event);
payload = callback_payload(&mut state.callback_mode, &filtered_event);
if let ChangeEvent::InitialDataBatch { batch_control, .. } = filtered_event {
if batch_control.has_more && state.options.auto_fetch_batches.unwrap_or(false) {
next_batch = Some((client_id.to_string(), state.last_seq_id));
}
}
}
match event {
ServerMessage::SubscriptionAck { .. } => {
if state.awaiting_initial_response {
state.awaiting_initial_response = false;
state.pending_subscribe_reject = None;
resolve_subscribe = state.pending_subscribe_resolve.take();
}
},
ServerMessage::Error { code, message, .. } => {
if state.awaiting_initial_response {
state.awaiting_initial_response = false;
state.pending_subscribe_resolve = None;
if let Some(reject) = state.pending_subscribe_reject.take() {
reject_subscribe = Some((
reject,
format!("Subscription failed ({}): {}", code, message),
));
}
remove_state = true;
}
},
_ => {},
}
}
if remove_state {
subs.remove(client_id);
}
}
Some(SubscriptionDispatch {
callback,
payload,
resolve_subscribe,
reject_subscribe,
next_batch,
})
}
fn emit_runtime_ws_error(
on_error_cb: &Rc<RefCell<Option<js_sys::Function>>>,
error: &crate::event_handlers::ConnectionError,
) {
if let Some(cb) = on_error_cb.borrow().as_ref() {
let err_obj = js_sys::Object::new();
let _ =
js_sys::Reflect::set(&err_obj, &"message".into(), &JsValue::from_str(&error.message));
let _ = js_sys::Reflect::set(
&err_obj,
&"recoverable".into(),
&JsValue::from_bool(error.recoverable),
);
if let Some(url) = &error.url {
let _ = js_sys::Reflect::set(&err_obj, &"url".into(), &JsValue::from_str(url));
}
if let Some(auth_user) = &error.auth_user {
let _ =
js_sys::Reflect::set(&err_obj, &"authUser".into(), &JsValue::from_str(auth_user));
}
if let Some(hint) = &error.hint {
let _ = js_sys::Reflect::set(&err_obj, &"hint".into(), &JsValue::from_str(hint));
}
let _ = cb.call1(&JsValue::NULL, &err_obj);
}
}
fn auth_user_from_wasm_auth(auth: &WasmAuthProvider) -> Option<&str> {
match auth {
WasmAuthProvider::Basic { username, .. } => Some(username.as_str()),
_ => None,
}
}
fn connection_hint(detail: &str, recoverable: bool, auth_user: Option<&str>) -> &'static str {
let normalized = detail.to_lowercase();
if normalized.contains("invalid url")
|| normalized.contains("relative url")
|| normalized.contains("base url")
|| normalized.contains("url parse")
{
return "Check the configured KalamDB URL. Use an absolute http:// or https:// base URL that the client can reach.";
}
if normalized.contains("401")
|| normalized.contains("403")
|| normalized.contains("unauthorized")
|| normalized.contains("authentication")
|| normalized.contains("token")
|| normalized.contains("invalid credentials")
{
return if auth_user.is_some() {
"Verify the configured auth user and password or JWT token. Basic auth must login() successfully before opening realtime connections."
} else {
"Verify the configured JWT token or auth provider. Realtime connections require valid authentication before subscribing."
};
}
if recoverable {
return "Verify KalamDB is running and reachable at the configured URL from this client, then retry.";
}
"Review the connection configuration and authentication settings for this client."
}
fn build_runtime_connection_error(
context: &str,
detail: &str,
recoverable: bool,
url: &str,
auth_user: Option<&str>,
) -> crate::event_handlers::ConnectionError {
let auth_fragment = auth_user.map(|user| format!(" for user \"{}\"", user)).unwrap_or_default();
let message = if context.is_empty() {
format!(
"{}{} at {}. Hint: {}",
detail,
auth_fragment,
url,
connection_hint(detail, recoverable, auth_user)
)
} else {
format!(
"{}{} at {}: {}. Hint: {}",
context,
auth_fragment,
url,
detail,
connection_hint(detail, recoverable, auth_user)
)
};
let mut error = crate::event_handlers::ConnectionError::new(message, recoverable).with_url(url);
if let Some(user) = auth_user {
error = error.with_auth_user(user);
}
error.with_hint(connection_hint(detail, recoverable, auth_user))
}
fn js_error_message(error: &JsValue) -> String {
error
.as_string()
.or_else(|| {
js_sys::Reflect::get(error, &"message".into())
.ok()
.and_then(|value| value.as_string())
})
.unwrap_or_else(|| format!("{:?}", error))
}
fn is_recoverable_preconnect_error(message: &str) -> bool {
let normalized = message.to_lowercase();
!(normalized.contains("401")
|| normalized.contains("403")
|| normalized.contains("unauthorized")
|| normalized.contains("authentication")
|| normalized.contains("token")
|| normalized.contains("invalid credentials")
|| normalized.contains("invalid url")
|| normalized.contains("relative url")
|| normalized.contains("configuration error")
|| normalized.contains("base url"))
}
fn emit_runtime_js_error(
on_error_cb: &Rc<RefCell<Option<js_sys::Function>>>,
error: &JsValue,
context: &str,
url: &str,
auth_user: Option<&str>,
) {
let detail = js_error_message(error);
let recoverable = is_recoverable_preconnect_error(&detail);
let event = build_runtime_connection_error(context, &detail, recoverable, url, auth_user);
emit_runtime_ws_error(on_error_cb, &event);
}
fn clear_active_socket(
ws_ref: &Rc<RefCell<Option<WebSocket>>>,
source_ws: &WebSocket,
ping_interval_id: &Rc<RefCell<i32>>,
) -> bool {
let should_clear = ws_ref
.borrow()
.as_ref()
.is_some_and(|current_ws| js_sys::Object::is(current_ws.as_ref(), source_ws.as_ref()));
if !should_clear {
return false;
}
if let Some(current_ws) = ws_ref.borrow_mut().take() {
current_ws.set_onclose(None);
current_ws.set_onerror(None);
current_ws.set_onmessage(None);
}
let id = *ping_interval_id.borrow();
if id >= 0 {
super::helpers::global_clear_interval(id);
*ping_interval_id.borrow_mut() = -1;
}
true
}
#[allow(clippy::too_many_arguments)]
fn schedule_auto_reconnect(
connection_options: Rc<RefCell<ConnectionOptions>>,
subscription_state: Rc<RefCell<HashMap<String, SubscriptionState>>>,
reconnect_attempts: Rc<RefCell<u32>>,
is_reconnecting: Rc<RefCell<bool>>,
ws_ref: Rc<RefCell<Option<WebSocket>>>,
ping_interval_id: Rc<RefCell<i32>>,
url: String,
auth: WasmAuthProvider,
auth_provider_cb: Rc<RefCell<Option<js_sys::Function>>>,
on_connect_cb: Rc<RefCell<Option<js_sys::Function>>>,
on_disconnect_cb: Rc<RefCell<Option<js_sys::Function>>>,
on_error_cb: Rc<RefCell<Option<js_sys::Function>>>,
on_receive_cb: Rc<RefCell<Option<js_sys::Function>>>,
on_send_cb: Rc<RefCell<Option<js_sys::Function>>>,
negotiated_ser: Rc<Cell<SerializationType>>,
) {
let (delay, disable_compression) = {
let opts = connection_options.borrow();
if !opts.auto_reconnect || *is_reconnecting.borrow() {
return;
}
let current_attempts = *reconnect_attempts.borrow();
if let Some(max) = opts.max_reconnect_attempts {
if current_attempts >= max {
wasm_debug_log!(&format!(
"KalamClient: Max reconnection attempts ({}) reached",
max
));
let event = build_runtime_connection_error(
"Max reconnection attempts reached",
"auto-reconnect stopped after exhausting the configured retry budget",
false,
&url,
auth_user_from_wasm_auth(&auth),
);
emit_runtime_ws_error(&on_error_cb, &event);
return;
}
}
(
std::cmp::min(
opts.reconnect_delay_ms * (2u64.pow(current_attempts)),
opts.max_reconnect_delay_ms,
),
opts.disable_compression,
)
};
wasm_debug_log!(&format!(
"KalamClient: Scheduling reconnection in {}ms (attempt {})",
delay,
*reconnect_attempts.borrow() + 1
));
let reconnect_fn = Closure::wrap(Box::new(move || {
{
let opts = connection_options.borrow();
if !opts.auto_reconnect {
return;
}
}
if ws_ref.borrow().is_some() {
return;
}
*is_reconnecting.borrow_mut() = true;
*reconnect_attempts.borrow_mut() += 1;
let reconnect_url = url.clone();
let reconnect_auth = auth.clone();
let reconnect_auth_provider = Rc::clone(&auth_provider_cb);
let reconnect_connection_options = Rc::clone(&connection_options);
let reconnect_subscription_state = Rc::clone(&subscription_state);
let reconnect_reconnect_attempts = Rc::clone(&reconnect_attempts);
let reconnect_is_reconnecting = Rc::clone(&is_reconnecting);
let reconnect_ws_ref = Rc::clone(&ws_ref);
let reconnect_ping_interval_id = Rc::clone(&ping_interval_id);
let reconnect_on_connect = Rc::clone(&on_connect_cb);
let reconnect_on_disconnect = Rc::clone(&on_disconnect_cb);
let reconnect_on_error = Rc::clone(&on_error_cb);
let reconnect_on_receive = Rc::clone(&on_receive_cb);
let reconnect_on_send = Rc::clone(&on_send_cb);
let reconnect_negotiated_ser = Rc::clone(&negotiated_ser);
let next_url = url.clone();
let next_auth = auth.clone();
let next_auth_provider = Rc::clone(&auth_provider_cb);
let next_connection_options = Rc::clone(&connection_options);
let next_subscription_state = Rc::clone(&subscription_state);
let next_reconnect_attempts = Rc::clone(&reconnect_attempts);
let next_is_reconnecting = Rc::clone(&is_reconnecting);
let next_ws_ref = Rc::clone(&ws_ref);
let next_ping_interval_id = Rc::clone(&ping_interval_id);
let next_on_connect = Rc::clone(&on_connect_cb);
let next_on_disconnect = Rc::clone(&on_disconnect_cb);
let next_on_error = Rc::clone(&on_error_cb);
let next_on_receive = Rc::clone(&on_receive_cb);
let next_on_send = Rc::clone(&on_send_cb);
let next_negotiated_ser = Rc::clone(&negotiated_ser);
let reconnect_error_url = reconnect_url.clone();
let reconnect_error_auth = reconnect_auth.clone();
wasm_bindgen_futures::spawn_local(async move {
match reconnect_internal_with_auth(
reconnect_url,
reconnect_auth,
reconnect_auth_provider.borrow().clone(),
disable_compression,
)
.await
{
Ok(ws) => {
*reconnect_ws_ref.borrow_mut() = Some(ws.clone());
install_runtime_disconnect_handlers(
&ws,
Rc::clone(&reconnect_subscription_state),
Rc::clone(&reconnect_ws_ref),
Rc::clone(&reconnect_ping_interval_id),
Rc::clone(&reconnect_on_disconnect),
Rc::clone(&reconnect_on_error),
reconnect_error_url.clone(),
auth_user_from_wasm_auth(&reconnect_error_auth).map(str::to_owned),
);
install_runtime_message_handler(
&ws,
Rc::clone(&reconnect_subscription_state),
Rc::clone(&reconnect_on_receive),
Rc::clone(&reconnect_on_send),
Rc::clone(&reconnect_negotiated_ser),
);
if let Some(cb) = reconnect_on_connect.borrow().as_ref() {
let _ = cb.call0(&JsValue::NULL);
}
wasm_debug_log!("KalamClient: Reconnection successful");
*reconnect_reconnect_attempts.borrow_mut() = 0;
install_auto_reconnect_listener(
&ws,
Rc::clone(&next_connection_options),
Rc::clone(&reconnect_subscription_state),
Rc::clone(&next_reconnect_attempts),
Rc::clone(&next_is_reconnecting),
Rc::clone(&next_ws_ref),
Rc::clone(&next_ping_interval_id),
next_url,
next_auth,
Rc::clone(&next_auth_provider),
Rc::clone(&next_on_connect),
Rc::clone(&next_on_disconnect),
Rc::clone(&next_on_error),
Rc::clone(&next_on_receive),
Rc::clone(&next_on_send),
Rc::clone(&next_negotiated_ser),
);
resubscribe_all(
Rc::clone(&reconnect_ws_ref),
Rc::clone(&reconnect_subscription_state),
reconnect_negotiated_ser.get(),
Some(Rc::clone(&reconnect_on_send)),
)
.await;
reconnect::restart_ping_timer(
&reconnect_ws_ref,
&reconnect_connection_options,
&reconnect_ping_interval_id,
&reconnect_negotiated_ser,
);
*reconnect_is_reconnecting.borrow_mut() = false;
},
Err(_e) => {
wasm_debug_log!(&format!("KalamClient: Reconnection failed: {:?}", _e));
*reconnect_is_reconnecting.borrow_mut() = false;
schedule_auto_reconnect(
next_connection_options,
next_subscription_state,
next_reconnect_attempts,
next_is_reconnecting,
next_ws_ref,
next_ping_interval_id,
next_url,
next_auth,
next_auth_provider,
next_on_connect,
next_on_disconnect,
next_on_error,
next_on_receive,
next_on_send,
next_negotiated_ser,
);
},
}
});
}) as Box<dyn FnMut()>);
super::helpers::global_set_timeout(reconnect_fn.as_ref().unchecked_ref(), delay as i32);
reconnect_fn.forget();
}
fn install_runtime_disconnect_handlers(
ws: &WebSocket,
subscriptions: Rc<RefCell<HashMap<String, SubscriptionState>>>,
ws_ref: Rc<RefCell<Option<WebSocket>>>,
ping_interval_id: Rc<RefCell<i32>>,
on_disconnect_cb: Rc<RefCell<Option<js_sys::Function>>>,
on_error_cb: Rc<RefCell<Option<js_sys::Function>>>,
url: String,
auth_user: Option<String>,
) {
let subscriptions_for_error = Rc::clone(&subscriptions);
let on_error_for_err = Rc::clone(&on_error_cb);
let ws_ref_for_error = Rc::clone(&ws_ref);
let ping_interval_id_for_error = Rc::clone(&ping_interval_id);
let source_ws_for_error = ws.clone();
let onerror_callback = Closure::wrap(Box::new(move |_e: ErrorEvent| {
wasm_debug_log!(&format!("KalamClient: WebSocket error: {:?}", _e));
if !clear_active_socket(
&ws_ref_for_error,
&source_ws_for_error,
&ping_interval_id_for_error,
) {
return;
}
let event = build_runtime_connection_error(
"Failed to connect to KalamDB",
"WebSocket connection failed",
true,
&url,
auth_user.as_deref(),
);
emit_runtime_ws_error(&on_error_for_err, &event);
reject_pending_subscriptions(
&subscriptions_for_error,
"WebSocket connection failed before the subscription was acknowledged",
);
}) as Box<dyn FnMut(ErrorEvent)>);
ws.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
onerror_callback.forget();
let subscriptions_for_close = Rc::clone(&subscriptions);
let on_disconnect_for_close = Rc::clone(&on_disconnect_cb);
let ws_ref_for_close = Rc::clone(&ws_ref);
let ping_interval_id_for_close = Rc::clone(&ping_interval_id);
let source_ws_for_close = ws.clone();
let onclose_callback = Closure::wrap(Box::new(move |e: CloseEvent| {
wasm_debug_log!(&format!(
"KalamClient: WebSocket closed: code={}, reason={}",
e.code(),
e.reason()
));
if !clear_active_socket(
&ws_ref_for_close,
&source_ws_for_close,
&ping_interval_id_for_close,
) {
return;
}
if let Some(cb) = on_disconnect_for_close.borrow().as_ref() {
let reason_obj = js_sys::Object::new();
let _ = js_sys::Reflect::set(
&reason_obj,
&"message".into(),
&JsValue::from_str(&e.reason()),
);
let _ = js_sys::Reflect::set(
&reason_obj,
&"code".into(),
&JsValue::from_f64(e.code() as f64),
);
let _ = cb.call1(&JsValue::NULL, &reason_obj);
}
let close_message = if e.reason().is_empty() {
format!("WebSocket closed before the subscription was acknowledged (code {})", e.code())
} else {
format!("WebSocket closed before the subscription was acknowledged: {}", e.reason())
};
reject_pending_subscriptions(&subscriptions_for_close, &close_message);
}) as Box<dyn FnMut(CloseEvent)>);
ws.set_onclose(Some(onclose_callback.as_ref().unchecked_ref()));
onclose_callback.forget();
}
fn install_runtime_message_handler(
ws: &WebSocket,
subscriptions: Rc<RefCell<HashMap<String, SubscriptionState>>>,
on_receive_cb: Rc<RefCell<Option<js_sys::Function>>>,
on_send_cb: Rc<RefCell<Option<js_sys::Function>>>,
negotiated_ser: Rc<Cell<SerializationType>>,
) {
let ws_for_next_batch = ws.clone();
let onmessage_callback = Closure::wrap(Box::new(move |e: MessageEvent| {
let event: Option<ServerMessage> = (|| {
let data = e.data();
if data.is_instance_of::<js_sys::JsString>() {
let text: String = data.dyn_into::<js_sys::JsString>().ok()?.into();
if let Some(cb) = on_receive_cb.borrow().as_ref() {
let _ = cb.call1(&JsValue::NULL, &JsValue::from_str(&text));
}
serde_json::from_str::<ServerMessage>(&text).ok()
} else if negotiated_ser.get() == SerializationType::MessagePack {
let raw = decode_ws_binary_payload(&e)?;
rmp_serde::from_slice::<ServerMessage>(&raw).ok()
} else {
let message = decode_ws_message(&e)?;
if let Some(cb) = on_receive_cb.borrow().as_ref() {
let _ = cb.call1(&JsValue::NULL, &JsValue::from_str(&message));
}
serde_json::from_str::<ServerMessage>(&message).ok()
}
})();
if let Some(event) = event {
if let Some(dispatch) = dispatch_subscription_server_message(&subscriptions, &event) {
if let Some((subscription_id, last_seq_id)) = dispatch.invoke() {
let _ = send_next_batch_traced(
&ws_for_next_batch,
&subscription_id,
last_seq_id,
negotiated_ser.get(),
&on_send_cb,
);
}
}
}
}) as Box<dyn FnMut(MessageEvent)>);
ws.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
onmessage_callback.forget();
}
#[wasm_bindgen]
impl KalamClient {
#[wasm_bindgen(constructor)]
pub fn new(url: String, username: String, password: String) -> Result<KalamClient, JsValue> {
if url.is_empty() {
return Err(JsValue::from_str(
"KalamClient: 'url' parameter is required and cannot be empty",
));
}
if username.is_empty() {
return Err(JsValue::from_str(
"KalamClient: 'username' parameter is required and cannot be empty",
));
}
if password.is_empty() {
return Err(JsValue::from_str(
"KalamClient: 'password' parameter is required and cannot be empty",
));
}
Ok(KalamClient::new_with_auth(url, WasmAuthProvider::Basic { username, password }))
}
#[wasm_bindgen(js_name = withJwt)]
pub fn with_jwt(url: String, token: String) -> Result<KalamClient, JsValue> {
if url.is_empty() {
return Err(JsValue::from_str(
"KalamClient.withJwt: 'url' parameter is required and cannot be empty",
));
}
if token.is_empty() {
return Err(JsValue::from_str(
"KalamClient.withJwt: 'token' parameter is required and cannot be empty",
));
}
Ok(KalamClient::new_with_auth(url, WasmAuthProvider::Jwt { token }))
}
#[wasm_bindgen(js_name = anonymous)]
pub fn anonymous(url: String) -> Result<KalamClient, JsValue> {
if url.is_empty() {
return Err(JsValue::from_str(
"KalamClient.anonymous: 'url' parameter is required and cannot be empty",
));
}
Ok(KalamClient::new_with_auth(url, WasmAuthProvider::None))
}
#[wasm_bindgen(js_name = getAuthType)]
pub fn get_auth_type(&self) -> String {
match &*self.auth.borrow() {
WasmAuthProvider::Basic { .. } => "basic".to_string(),
WasmAuthProvider::Jwt { .. } => "jwt".to_string(),
WasmAuthProvider::None => "none".to_string(),
}
}
#[wasm_bindgen(js_name = setAutoReconnect)]
pub fn set_auto_reconnect(&self, enabled: bool) {
self.connection_options.borrow_mut().auto_reconnect = enabled;
}
#[wasm_bindgen(js_name = setReconnectDelay)]
pub fn set_reconnect_delay(&self, initial_delay_ms: u64, max_delay_ms: u64) {
let mut opts = self.connection_options.borrow_mut();
opts.reconnect_delay_ms = initial_delay_ms;
opts.max_reconnect_delay_ms = max_delay_ms;
}
#[wasm_bindgen(js_name = setMaxReconnectAttempts)]
pub fn set_max_reconnect_attempts(&self, max_attempts: u32) {
self.connection_options.borrow_mut().max_reconnect_attempts = if max_attempts == 0 {
None
} else {
Some(max_attempts)
};
}
#[wasm_bindgen(js_name = getReconnectAttempts)]
pub fn get_reconnect_attempts(&self) -> u32 {
*self.reconnect_attempts.borrow()
}
#[wasm_bindgen(js_name = isReconnecting)]
pub fn is_reconnecting_flag(&self) -> bool {
*self.is_reconnecting.borrow()
}
#[wasm_bindgen(js_name = getLastSeqId)]
pub fn get_last_seq_id(&self, subscription_id: String) -> Option<String> {
self.subscription_state
.borrow()
.get(&subscription_id)
.and_then(|state| state.last_seq_id.map(|seq| seq.to_string()))
}
#[wasm_bindgen(js_name = onConnect)]
pub fn on_connect(&self, callback: js_sys::Function) {
*self.on_connect_cb.borrow_mut() = Some(callback);
}
#[wasm_bindgen(js_name = onDisconnect)]
pub fn on_disconnect(&self, callback: js_sys::Function) {
*self.on_disconnect_cb.borrow_mut() = Some(callback);
}
#[wasm_bindgen(js_name = onError)]
pub fn on_error(&self, callback: js_sys::Function) {
*self.on_error_cb.borrow_mut() = Some(callback);
}
#[wasm_bindgen(js_name = onConnectionError)]
pub fn on_connection_error(&self, callback: js_sys::Function) {
*self.on_error_cb.borrow_mut() = Some(callback);
}
#[wasm_bindgen(js_name = onReceive)]
pub fn on_receive(&self, callback: js_sys::Function) {
*self.on_receive_cb.borrow_mut() = Some(callback);
}
#[wasm_bindgen(js_name = onSend)]
pub fn on_send(&self, callback: js_sys::Function) {
*self.on_send_cb.borrow_mut() = Some(callback);
}
#[wasm_bindgen(js_name = setAuthProvider)]
pub fn set_auth_provider(&self, callback: js_sys::Function) {
*self.auth_provider_cb.borrow_mut() = Some(callback);
}
#[wasm_bindgen(js_name = setDefaultNamespace)]
pub fn set_default_namespace(&self, namespace: Option<String>) {
*self.default_namespace.borrow_mut() = normalize_default_namespace(namespace);
}
#[wasm_bindgen(js_name = clearAuthProvider)]
pub fn clear_auth_provider(&self) {
*self.auth_provider_cb.borrow_mut() = None;
}
#[wasm_bindgen(js_name = setDisableCompression)]
pub fn set_disable_compression(&self, disable: bool) {
self.connection_options.borrow_mut().disable_compression = disable;
}
#[wasm_bindgen(js_name = setWsLazyConnect)]
pub fn set_ws_lazy_connect(&self, lazy: bool) {
self.connection_options.borrow_mut().ws_lazy_connect = lazy;
}
pub async fn connect(&mut self) -> Result<(), JsValue> {
self.connection_options.borrow_mut().auto_reconnect = true;
if self.auth_provider_cb.borrow().is_none()
&& matches!(&*self.auth.borrow(), WasmAuthProvider::Basic { .. })
{
if let Err(error) = self.reauthenticate_for_http().await {
let auth_user = {
let auth = self.auth.borrow();
auth_user_from_wasm_auth(&auth).map(str::to_owned)
};
emit_runtime_js_error(
&self.on_error_cb,
&error,
"Authentication failed",
&self.url,
auth_user.as_deref(),
);
return Err(error);
}
}
let resolved_auth = match resolve_auth_provider(
self.auth_provider_cb.borrow().clone(),
self.auth.borrow().clone(),
)
.await
{
Ok(auth) => auth,
Err(error) => {
let auth_user = {
let auth = self.auth.borrow();
auth_user_from_wasm_auth(&auth).map(str::to_owned)
};
emit_runtime_js_error(
&self.on_error_cb,
&error,
"Authentication failed",
&self.url,
auth_user.as_deref(),
);
return Err(error);
},
};
if matches!(resolved_auth, WasmAuthProvider::Basic { .. }) {
let error = JsValue::from_str(
"WebSocket authentication requires a JWT token. Use KalamClient.withJwt, login \
first, or set an authProvider.",
);
emit_runtime_js_error(
&self.on_error_cb,
&error,
"Authentication failed",
&self.url,
auth_user_from_wasm_auth(&resolved_auth),
);
return Err(error);
}
if self.is_connected() {
wasm_debug_log!("KalamClient: Already connected, skipping reconnection");
return Ok(());
}
wasm_debug_log!("KalamClient: Connecting to WebSocket...");
let disable_compression = self.connection_options.borrow().disable_compression;
let ws_url = match super::helpers::ws_url_from_http_opts(&self.url, disable_compression) {
Ok(url) => url,
Err(error) => {
emit_runtime_js_error(
&self.on_error_cb,
&error,
"Failed to connect to KalamDB",
&self.url,
auth_user_from_wasm_auth(&resolved_auth),
);
return Err(error);
},
};
let ws = match WebSocket::new(&ws_url) {
Ok(ws) => ws,
Err(error) => {
emit_runtime_js_error(
&self.on_error_cb,
&error,
"Failed to connect to KalamDB",
&self.url,
auth_user_from_wasm_auth(&resolved_auth),
);
return Err(error);
},
};
ws.set_binary_type(web_sys::BinaryType::Arraybuffer);
let (connect_promise, connect_resolve, connect_reject) = create_promise();
let requires_auth = !matches!(resolved_auth, WasmAuthProvider::None);
let (auth_promise, auth_resolve, auth_reject) = create_promise();
let protocol_opts = self.connection_options.borrow().protocol.clone();
let auth_message = resolved_auth.to_ws_auth_message(protocol_opts);
let ws_clone_for_auth = ws.clone();
let auth_resolve_for_anon = auth_resolve.clone();
let on_send_for_open = Rc::clone(&self.on_send_cb);
let on_connect_for_open = Rc::clone(&self.on_connect_cb);
let connect_resolve_clone = connect_resolve.clone();
let onopen_callback = Closure::wrap(Box::new(move || {
wasm_debug_log!("KalamClient: WebSocket connected, sending authentication...");
if let Some(auth_msg) = &auth_message {
if let Ok(json) = serde_json::to_string(&auth_msg) {
if let Some(cb) = on_send_for_open.borrow().as_ref() {
let _ = cb.call1(&JsValue::NULL, &JsValue::from_str(&json));
}
if let Err(_e) = ws_clone_for_auth.send_with_str(&json) {
wasm_debug_log!(&format!(
"KalamClient: Failed to send auth message: {:?}",
_e
));
}
}
} else {
wasm_debug_log!("KalamClient: Anonymous connection, skipping authentication");
if let Some(cb) = on_connect_for_open.borrow().as_ref() {
let _ = cb.call0(&JsValue::NULL);
}
let _ = auth_resolve_for_anon.call0(&JsValue::NULL);
}
let _ = connect_resolve_clone.call0(&JsValue::NULL);
}) as Box<dyn FnMut()>);
ws.set_onopen(Some(onopen_callback.as_ref().unchecked_ref()));
onopen_callback.forget();
let connect_reject_clone = connect_reject.clone();
let auth_reject_clone = auth_reject.clone();
let on_error_for_err = Rc::clone(&self.on_error_cb);
let subscriptions_for_error = Rc::clone(&self.subscription_state);
let ws_ref_for_error = Rc::clone(&self.ws);
let ping_interval_id_for_error = Rc::clone(&self.ping_interval_id);
let source_ws_for_error = ws.clone();
let error_url = self.url.clone();
let error_auth_user = auth_user_from_wasm_auth(&resolved_auth).map(str::to_owned);
let onerror_callback = Closure::wrap(Box::new(move |_e: ErrorEvent| {
wasm_debug_log!(&format!("KalamClient: WebSocket error: {:?}", _e));
if !clear_active_socket(
&ws_ref_for_error,
&source_ws_for_error,
&ping_interval_id_for_error,
) {
return;
}
let event = build_runtime_connection_error(
"Failed to connect to KalamDB",
"WebSocket connection failed",
true,
&error_url,
error_auth_user.as_deref(),
);
emit_runtime_ws_error(&on_error_for_err, &event);
reject_pending_subscriptions(
&subscriptions_for_error,
"WebSocket connection failed before the subscription was acknowledged",
);
let error_msg = JsValue::from_str(&event.message);
let _ = connect_reject_clone.call1(&JsValue::NULL, &error_msg);
let _ = auth_reject_clone.call1(&JsValue::NULL, &error_msg);
}) as Box<dyn FnMut(ErrorEvent)>);
ws.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
onerror_callback.forget();
let on_disconnect_for_close = Rc::clone(&self.on_disconnect_cb);
let subscriptions_for_close = Rc::clone(&self.subscription_state);
let ws_ref_for_close = Rc::clone(&self.ws);
let ping_interval_id_for_close = Rc::clone(&self.ping_interval_id);
let source_ws_for_close = ws.clone();
let onclose_callback = Closure::wrap(Box::new(move |e: CloseEvent| {
wasm_debug_log!(&format!(
"KalamClient: WebSocket closed: code={}, reason={}",
e.code(),
e.reason()
));
if !clear_active_socket(
&ws_ref_for_close,
&source_ws_for_close,
&ping_interval_id_for_close,
) {
return;
}
if let Some(cb) = on_disconnect_for_close.borrow().as_ref() {
let reason_obj = js_sys::Object::new();
let _ = js_sys::Reflect::set(
&reason_obj,
&"message".into(),
&JsValue::from_str(&e.reason()),
);
let _ = js_sys::Reflect::set(
&reason_obj,
&"code".into(),
&JsValue::from_f64(e.code() as f64),
);
let _ = cb.call1(&JsValue::NULL, &reason_obj);
}
let close_message = if e.reason().is_empty() {
format!(
"WebSocket closed before the subscription was acknowledged (code {})",
e.code()
)
} else {
format!("WebSocket closed before the subscription was acknowledged: {}", e.reason())
};
reject_pending_subscriptions(&subscriptions_for_close, &close_message);
}) as Box<dyn FnMut(CloseEvent)>);
ws.set_onclose(Some(onclose_callback.as_ref().unchecked_ref()));
onclose_callback.forget();
self.setup_auto_reconnect(&ws);
let subscriptions = Rc::clone(&self.subscription_state);
let auth_resolve_clone = auth_resolve.clone();
let auth_reject_clone2 = auth_reject.clone();
let auth_handled = Rc::new(RefCell::new(!requires_auth)); let auth_handled_clone = Rc::clone(&auth_handled);
let on_receive_for_msg = Rc::clone(&self.on_receive_cb);
let on_send_for_msg = Rc::clone(&self.on_send_cb);
let on_connect_for_msg = Rc::clone(&self.on_connect_cb);
let on_error_for_msg = Rc::clone(&self.on_error_cb);
let negotiated_ser_for_msg = Rc::clone(&self.negotiated_ser);
let ws_for_next_batch = ws.clone();
let onmessage_callback = Closure::wrap(Box::new(move |e: MessageEvent| {
let event: Option<ServerMessage> = (|| {
let data = e.data();
if data.is_instance_of::<js_sys::JsString>() {
let text: String = data.dyn_into::<js_sys::JsString>().ok()?.into();
if let Some(cb) = on_receive_for_msg.borrow().as_ref() {
let _ = cb.call1(&JsValue::NULL, &JsValue::from_str(&text));
}
serde_json::from_str::<ServerMessage>(&text).ok()
} else if negotiated_ser_for_msg.get() == SerializationType::MessagePack {
let raw = decode_ws_binary_payload(&e)?;
rmp_serde::from_slice::<ServerMessage>(&raw).ok()
} else {
let message = decode_ws_message(&e)?;
if let Some(cb) = on_receive_for_msg.borrow().as_ref() {
let _ = cb.call1(&JsValue::NULL, &JsValue::from_str(&message));
}
serde_json::from_str::<ServerMessage>(&message).ok()
}
})();
let event = match event {
Some(e) => e,
None => return,
};
if !*auth_handled_clone.borrow() {
match &event {
ServerMessage::AuthSuccess {
user: _user_id,
role: _role,
protocol,
} => {
wasm_debug_log!(&format!(
"KalamClient: Authentication successful - user_id: {}, role: {}",
_user_id, _role
));
negotiated_ser_for_msg.set(protocol.serialization);
*auth_handled_clone.borrow_mut() = true;
if let Some(cb) = on_connect_for_msg.borrow().as_ref() {
let _ = cb.call0(&JsValue::NULL);
}
let _ = auth_resolve_clone.call0(&JsValue::NULL);
return;
},
ServerMessage::AuthError { message: error_msg } => {
wasm_debug_log!(&format!(
"KalamClient: Authentication failed - {}",
error_msg
));
*auth_handled_clone.borrow_mut() = true;
if let Some(cb) = on_error_for_msg.borrow().as_ref() {
let err_obj = js_sys::Object::new();
let _ = js_sys::Reflect::set(
&err_obj,
&"message".into(),
&JsValue::from_str(&format!(
"Authentication failed: {}",
error_msg
)),
);
let _ = js_sys::Reflect::set(
&err_obj,
&"recoverable".into(),
&JsValue::FALSE,
);
let _ = cb.call1(&JsValue::NULL, &err_obj);
}
let error =
JsValue::from_str(&format!("Authentication failed: {}", error_msg));
let _ = auth_reject_clone2.call1(&JsValue::NULL, &error);
return;
},
_ => {},
}
}
if let Some(dispatch) = dispatch_subscription_server_message(&subscriptions, &event) {
if let Some((subscription_id, last_seq_id)) = dispatch.invoke() {
let _ = send_next_batch_traced(
&ws_for_next_batch,
&subscription_id,
last_seq_id,
negotiated_ser_for_msg.get(),
&on_send_for_msg,
);
}
}
}) as Box<dyn FnMut(MessageEvent)>);
ws.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
onmessage_callback.forget();
*self.ws.borrow_mut() = Some(ws);
wasm_debug_log!("KalamClient: Waiting for WebSocket to open...");
JsFuture::from(connect_promise).await?;
wasm_debug_log!("KalamClient: Waiting for authentication...");
JsFuture::from(auth_promise).await?;
wasm_debug_log!("KalamClient: WebSocket connection established and authenticated");
if !self.subscription_state.borrow().is_empty() {
resubscribe_all(
Rc::clone(&self.ws),
Rc::clone(&self.subscription_state),
self.negotiated_ser.get(),
Some(Rc::clone(&self.on_send_cb)),
)
.await;
}
self.start_ping_timer();
Ok(())
}
pub async fn disconnect(&mut self) -> Result<(), JsValue> {
wasm_debug_log!("KalamClient: Disconnecting from WebSocket...");
self.stop_ping_timer();
self.connection_options.borrow_mut().auto_reconnect = false;
if let Some(ws) = self.ws.borrow_mut().take() {
ws.set_onclose(None);
ws.set_onerror(None);
ws.set_onmessage(None);
ws.close()?;
if let Some(cb) = self.on_disconnect_cb.borrow().as_ref() {
let reason_obj = js_sys::Object::new();
let _ = js_sys::Reflect::set(
&reason_obj,
&"message".into(),
&JsValue::from_str("manual disconnect"),
);
let _ =
js_sys::Reflect::set(&reason_obj, &"code".into(), &JsValue::from_f64(1000.0));
let _ = cb.call1(&JsValue::NULL, &reason_obj);
}
}
self.subscription_state.borrow_mut().clear();
wasm_debug_log!("KalamClient: Disconnected");
Ok(())
}
#[wasm_bindgen(js_name = isConnected)]
pub fn is_connected(&self) -> bool {
self.ws.borrow().as_ref().is_some_and(|ws| ws.ready_state() == WebSocket::OPEN)
}
#[wasm_bindgen(js_name = setPingInterval)]
pub fn set_ping_interval(&self, ms: u32) {
self.connection_options.borrow_mut().ping_interval_ms = ms as u64;
}
#[wasm_bindgen(js_name = sendPing)]
pub fn send_ping(&self) -> Result<(), JsValue> {
if let Some(ws) = self.ws.borrow().as_ref() {
if ws.ready_state() == WebSocket::OPEN {
send_ws_message_traced(
ws,
&ClientMessage::Ping,
self.negotiated_ser.get(),
&self.on_send_cb,
)?;
}
}
Ok(())
}
#[wasm_bindgen(js_name = requestNextBatch)]
pub fn request_next_batch(&self, subscription_id: String) -> Result<(), JsValue> {
let last_seq_id = self
.subscription_state
.borrow()
.get(&subscription_id)
.map(|state| state.last_seq_id)
.ok_or_else(|| JsValue::from_str("Unknown subscription id"))?;
let Some(ws) = self.ws.borrow().as_ref().cloned() else {
return Err(JsValue::from_str("WebSocket connection is unavailable"));
};
if ws.ready_state() != WebSocket::OPEN {
return Err(JsValue::from_str("WebSocket connection is not open"));
}
send_next_batch_traced(
&ws,
&subscription_id,
last_seq_id,
self.negotiated_ser.get(),
&self.on_send_cb,
)
}
fn start_ping_timer(&self) {
self.stop_ping_timer();
let interval_ms = self.connection_options.borrow().ping_interval_ms;
if interval_ms == 0 {
return;
}
let ws_ref = Rc::clone(&self.ws);
let negotiated_ser_for_ping = Rc::clone(&self.negotiated_ser);
let ping_cb = Closure::wrap(Box::new(move || {
if let Some(ws) = ws_ref.borrow().as_ref() {
if ws.ready_state() == WebSocket::OPEN {
let _ =
send_ws_message(ws, &ClientMessage::Ping, negotiated_ser_for_ping.get());
}
}
}) as Box<dyn FnMut()>);
let id = super::helpers::global_set_interval(
ping_cb.as_ref().unchecked_ref(),
interval_ms as i32,
);
ping_cb.forget();
*self.ping_interval_id.borrow_mut() = id;
}
fn stop_ping_timer(&self) {
let id = *self.ping_interval_id.borrow();
if id >= 0 {
super::helpers::global_clear_interval(id);
*self.ping_interval_id.borrow_mut() = -1;
}
}
pub async fn insert(&self, table_name: String, data: String) -> Result<String, JsValue> {
validate_sql_identifier(&table_name, "Table name")?;
let parsed: serde_json::Value = serde_json::from_str(&data)
.map_err(|e| JsValue::from_str(&format!("Invalid JSON data: {}", e)))?;
let obj = parsed
.as_object()
.ok_or_else(|| JsValue::from_str("Data must be a JSON object"))?;
if obj.is_empty() {
return Err(JsValue::from_str("Cannot insert empty object"));
}
for key in obj.keys() {
validate_column_name(key)?;
}
let columns: Vec<String> = obj.keys().map(|k| format!("\"{}\"", k)).collect();
let values: Vec<String> = obj
.values()
.map(|v| match v {
serde_json::Value::Null => "NULL".to_string(),
serde_json::Value::Bool(b) => b.to_string(),
serde_json::Value::Number(n) => n.to_string(),
serde_json::Value::String(s) => format!("'{}'", s.replace('\'', "''")),
_ => format!("'{}'", v.to_string().replace('\'', "''")),
})
.collect();
let sql = format!(
"INSERT INTO {} ({}) VALUES ({})",
quote_table_name(&table_name),
columns.join(", "),
values.join(", ")
);
let namespace_id = self.default_namespace.borrow().clone();
self.execute_sql_internal(&sql, None, namespace_id).await
}
pub async fn delete(&self, table_name: String, row_id: String) -> Result<(), JsValue> {
validate_sql_identifier(&table_name, "Table name")?;
validate_row_id(&row_id)?;
let sql = format!(
"DELETE FROM {} WHERE id = '{}'",
quote_table_name(&table_name),
row_id.replace('\'', "''")
);
let namespace_id = self.default_namespace.borrow().clone();
self.execute_sql_internal(&sql, None, namespace_id).await?;
Ok(())
}
pub async fn query(&self, sql: String) -> Result<String, JsValue> {
let namespace_id = self.default_namespace.borrow().clone();
self.execute_sql_internal(&sql, None, namespace_id).await
}
#[wasm_bindgen(js_name = queryWithParams)]
pub async fn query_with_params(
&self,
sql: String,
params: Option<String>,
) -> Result<String, JsValue> {
let parsed_params: Option<Vec<serde_json::Value>> = match params {
Some(p) if !p.is_empty() => Some(
serde_json::from_str(&p)
.map_err(|e| JsValue::from_str(&format!("Invalid params JSON: {}", e)))?,
),
_ => None,
};
let namespace_id = self.default_namespace.borrow().clone();
self.execute_sql_internal(&sql, parsed_params, namespace_id).await
}
#[wasm_bindgen(js_name = liveTable)]
pub async fn live_table(
&self,
table_name: String,
options: Option<String>,
callback: js_sys::Function,
) -> Result<String, JsValue> {
validate_sql_identifier(&table_name, "Table name")?;
let sql = format!("SELECT * FROM {}", quote_table_name(&table_name));
self.live(sql, options, callback).await
}
#[wasm_bindgen(js_name = liveEvents)]
pub async fn live_events(
&self,
sql: String,
options: Option<String>,
callback: js_sys::Function,
) -> Result<String, JsValue> {
let subscription_options: SubscriptionOptions = if let Some(opts_json) = options {
serde_json::from_str(&opts_json)
.map_err(|e| JsValue::from_str(&format!("Invalid options JSON: {}", e)))?
} else {
SubscriptionOptions::default()
};
let default_namespace = self.default_namespace.borrow().clone();
let qualified_sql = qualify_subscription_sql(&sql, default_namespace.as_deref());
self.register_subscription(
qualified_sql,
subscription_options,
callback,
SubscriptionCallbackMode::raw(),
)
.await
}
#[wasm_bindgen(js_name = live)]
pub async fn live(
&self,
sql: String,
options: Option<String>,
callback: js_sys::Function,
) -> Result<String, JsValue> {
let parsed_options = if let Some(opts_json) = options {
serde_json::from_str::<WasmLiveRowsOptions>(&opts_json)
.map_err(|e| JsValue::from_str(&format!("Invalid live rows options JSON: {}", e)))?
} else {
WasmLiveRowsOptions::default()
};
let default_namespace = self.default_namespace.borrow().clone();
let qualified_sql = qualify_subscription_sql(&sql, default_namespace.as_deref());
self.register_subscription(
qualified_sql,
parsed_options.subscription_options.unwrap_or_default(),
callback,
SubscriptionCallbackMode::live_rows(crate::subscription::LiveRowsConfig {
limit: parsed_options.limit,
key_columns: parsed_options.key_columns,
}),
)
.await
}
pub async fn unsubscribe(&self, subscription_id: String) -> Result<(), JsValue> {
if !self.is_connected() {
return Err(JsValue::from_str("Not connected to server. Call connect() first."));
}
self.subscription_state.borrow_mut().remove(&subscription_id);
if let Some(ws) = self.ws.borrow().as_ref() {
let unsubscribe_msg = ClientMessage::Unsubscribe {
subscription_id: subscription_id.clone(),
};
send_ws_message_traced(
ws,
&unsubscribe_msg,
self.negotiated_ser.get(),
&self.on_send_cb,
)?;
}
wasm_debug_log!(&format!("KalamClient: Unsubscribed from: {}", subscription_id));
Ok(())
}
#[wasm_bindgen(js_name = getSubscriptions)]
pub fn get_subscriptions(&self) -> JsValue {
let state = self.subscription_state.borrow();
let list: Vec<serde_json::Value> = state
.iter()
.map(|(id, entry)| {
serde_json::json!({
"id": id,
"query": entry.sql,
"lastSeqId": entry.last_seq_id.map(|s| s.as_i64().to_string()),
"closed": false,
})
})
.collect();
JsValue::from_str(&serde_json::to_string(&list).unwrap_or_else(|_| "[]".to_string()))
}
pub async fn login(&mut self) -> Result<JsValue, JsValue> {
let (user, password) = match &*self.auth.borrow() {
WasmAuthProvider::Basic { username, password } => (username.clone(), password.clone()),
_ => {
return Err(JsValue::from_str(
"login() requires user/password credentials. Create client with new \
KalamClient(url, user, password)",
))
},
};
let login_response = self.perform_basic_login(&user, &password).await?;
*self.auth.borrow_mut() = WasmAuthProvider::Jwt {
token: login_response.access_token.clone(),
};
wasm_debug_log!("KalamClient: Login successful, switched to JWT authentication");
serialize_json_to_js_value(&login_response, "login response")
}
pub async fn refresh_access_token(&mut self, refresh_token: &str) -> Result<JsValue, JsValue> {
let url = format!("{}/v1/api/auth/refresh", self.url);
let auth_header = format!("Bearer {}", refresh_token);
let json_str = super::helpers::wasm_fetch(
&url,
"POST",
None,
&[("Authorization", &auth_header)],
"Token refresh failed",
)
.await?;
let login_response: crate::models::LoginResponse = serde_json::from_str(&json_str)
.map_err(|e| JsValue::from_str(&format!("Failed to parse refresh response: {}", e)))?;
*self.auth.borrow_mut() = WasmAuthProvider::Jwt {
token: login_response.access_token.clone(),
};
wasm_debug_log!("KalamClient: Token refreshed, updated JWT authentication");
serialize_json_to_js_value(&login_response, "refresh response")
}
async fn execute_sql_internal(
&self,
sql: &str,
params: Option<Vec<serde_json::Value>>,
namespace_id: Option<String>,
) -> Result<String, JsValue> {
if matches!(&*self.auth.borrow(), WasmAuthProvider::Basic { .. }) {
self.reauthenticate_for_http().await?;
}
let result = self.execute_sql_http(sql, ¶ms, namespace_id.as_deref()).await;
match result {
Ok(result_str) => {
if let Ok(query_resp) =
serde_json::from_str::<crate::query::models::QueryResponse>(&result_str)
{
if query_resp.is_token_expired() {
wasm_debug_log!(
"KalamClient: TOKEN_EXPIRED detected — reauthenticating and retrying \
query",
);
self.reauthenticate_for_http().await?;
return self.execute_sql_http(sql, ¶ms, namespace_id.as_deref()).await;
}
}
Ok(result_str)
},
Err(err) => {
if let Some(err_str) = err.as_string() {
if let Ok(query_resp) =
serde_json::from_str::<crate::query::models::QueryResponse>(&err_str)
{
if query_resp.is_token_expired() {
wasm_debug_log!(
"KalamClient: TOKEN_EXPIRED detected in HTTP error — \
reauthenticating and retrying query",
);
self.reauthenticate_for_http().await?;
return self
.execute_sql_http(sql, ¶ms, namespace_id.as_deref())
.await;
}
}
}
Err(err)
},
}
}
async fn execute_sql_http(
&self,
sql: &str,
params: &Option<Vec<serde_json::Value>>,
namespace_id: Option<&str>,
) -> Result<String, JsValue> {
let body = BorrowedQueryRequest {
sql,
params: params.as_deref(),
namespace_id,
};
let body_str = serde_json::to_string(&body)
.map_err(|e| JsValue::from_str(&format!("Serialization error: {}", e)))?;
let url = format!("{}/v1/api/sql", self.url);
let auth_header = self.auth.borrow().to_http_header();
let mut hdrs: Vec<(&str, &str)> = vec![("Content-Type", "application/json")];
if let Some(ref ah) = auth_header {
hdrs.push(("Authorization", ah));
}
let raw =
super::helpers::wasm_fetch(&url, "POST", Some(&body_str), &hdrs, "HTTP error").await?;
match serde_json::from_str::<crate::query::models::QueryResponse>(&raw) {
Ok(mut query_resp) => {
for result in &mut query_resp.results {
result.populate_named_rows();
}
serde_json::to_string(&query_resp)
.map_err(|e| JsValue::from_str(&format!("Serialization error: {}", e)))
},
Err(_) => Ok(raw),
}
}
async fn reauthenticate_for_http(&self) -> Result<(), JsValue> {
if let Some(cb) = self.auth_provider_cb.borrow().clone() {
let resolved_auth = resolve_auth_provider(Some(cb), WasmAuthProvider::None).await?;
let _log_message = match &resolved_auth {
WasmAuthProvider::Jwt { .. } => {
"KalamClient: Reauthenticated via authProvider (JWT)"
},
WasmAuthProvider::None => "KalamClient: authProvider returned no credentials",
WasmAuthProvider::Basic { .. } => unreachable!(),
};
*self.auth.borrow_mut() = resolved_auth;
wasm_debug_log!(_log_message);
return Ok(());
}
let basic = {
let auth = self.auth.borrow();
match &*auth {
WasmAuthProvider::Basic { username, password } => {
Some((username.clone(), password.clone()))
},
_ => None,
}
};
if let Some((username, password)) = basic {
let login_resp = self.perform_basic_login(&username, &password).await?;
*self.auth.borrow_mut() = WasmAuthProvider::Jwt {
token: login_resp.access_token,
};
wasm_debug_log!("KalamClient: Reauthenticated via basic login");
return Ok(());
}
Err(JsValue::from_str(
"Cannot reauthenticate: no authProvider callback or basic credentials available",
))
}
async fn perform_basic_login(
&self,
user: &str,
password: &str,
) -> Result<crate::models::LoginResponse, JsValue> {
let body = serde_json::json!({
"user": user,
"password": password,
});
let body_str = body.to_string();
let url = format!("{}/v1/api/auth/login", self.url);
let json_str = super::helpers::wasm_fetch(
&url,
"POST",
Some(&body_str),
&[("Content-Type", "application/json")],
"Login failed",
)
.await?;
serde_json::from_str(&json_str)
.map_err(|e| JsValue::from_str(&format!("Failed to parse login response: {}", e)))
}
fn setup_auto_reconnect(&self, ws: &WebSocket) {
install_auto_reconnect_listener(
ws,
Rc::clone(&self.connection_options),
Rc::clone(&self.subscription_state),
Rc::clone(&self.reconnect_attempts),
Rc::clone(&self.is_reconnecting),
Rc::clone(&self.ws),
Rc::clone(&self.ping_interval_id),
self.url.clone(),
self.auth.borrow().clone(),
Rc::clone(&self.auth_provider_cb),
Rc::clone(&self.on_connect_cb),
Rc::clone(&self.on_disconnect_cb),
Rc::clone(&self.on_error_cb),
Rc::clone(&self.on_receive_cb),
Rc::clone(&self.on_send_cb),
Rc::clone(&self.negotiated_ser),
);
}
}
#[allow(clippy::too_many_arguments)]
fn install_auto_reconnect_listener(
ws: &WebSocket,
connection_options: Rc<RefCell<ConnectionOptions>>,
subscription_state: Rc<RefCell<HashMap<String, SubscriptionState>>>,
reconnect_attempts: Rc<RefCell<u32>>,
is_reconnecting: Rc<RefCell<bool>>,
ws_ref: Rc<RefCell<Option<WebSocket>>>,
ping_interval_id: Rc<RefCell<i32>>,
url: String,
auth: WasmAuthProvider,
auth_provider_cb: Rc<RefCell<Option<js_sys::Function>>>,
on_connect_cb: Rc<RefCell<Option<js_sys::Function>>>,
on_disconnect_cb: Rc<RefCell<Option<js_sys::Function>>>,
on_error_cb: Rc<RefCell<Option<js_sys::Function>>>,
on_receive_cb: Rc<RefCell<Option<js_sys::Function>>>,
on_send_cb: Rc<RefCell<Option<js_sys::Function>>>,
negotiated_ser: Rc<Cell<SerializationType>>,
) {
let source_ws = ws.clone();
let onclose_reconnect = Closure::wrap(Box::new(move |_e: CloseEvent| {
let is_active_socket = ws_ref
.borrow()
.as_ref()
.is_some_and(|current_ws| js_sys::Object::is(current_ws.as_ref(), source_ws.as_ref()));
if !is_active_socket {
return;
}
schedule_auto_reconnect(
Rc::clone(&connection_options),
Rc::clone(&subscription_state),
Rc::clone(&reconnect_attempts),
Rc::clone(&is_reconnecting),
Rc::clone(&ws_ref),
Rc::clone(&ping_interval_id),
url.clone(),
auth.clone(),
Rc::clone(&auth_provider_cb),
Rc::clone(&on_connect_cb),
Rc::clone(&on_disconnect_cb),
Rc::clone(&on_error_cb),
Rc::clone(&on_receive_cb),
Rc::clone(&on_send_cb),
Rc::clone(&negotiated_ser),
);
}) as Box<dyn FnMut(CloseEvent)>);
ws.add_event_listener_with_callback("close", onclose_reconnect.as_ref().unchecked_ref())
.ok();
onclose_reconnect.forget();
}
#[cfg(test)]
mod tests {
use super::{normalize_default_namespace, qualify_subscription_sql};
#[test]
fn normalize_default_namespace_trims_and_drops_empty_values() {
assert_eq!(
normalize_default_namespace(Some(" chat ".to_string())),
Some("chat".to_string())
);
assert_eq!(normalize_default_namespace(Some(" ".to_string())), None);
assert_eq!(normalize_default_namespace(None), None);
}
#[test]
fn qualify_subscription_sql_uses_default_namespace_for_unqualified_relations() {
let sql = qualify_subscription_sql("SELECT * FROM messages WHERE id = 1", Some("chat"));
assert_eq!(sql, "SELECT * FROM \"chat\".messages WHERE id = 1");
}
#[test]
fn qualify_subscription_sql_preserves_explicit_namespace() {
let sql = qualify_subscription_sql("SELECT * FROM billing.messages", Some("chat"));
assert_eq!(sql, "SELECT * FROM billing.messages");
}
}