use crate::{
models::{ChangeEvent, ServerMessage, SubscriptionOptions},
seq_tracking,
subscription::{LiveRowsConfig, LiveRowsMaterializer},
SeqId,
};
#[derive(Clone)]
pub(crate) enum SubscriptionCallbackMode {
RawEvents,
LiveRows { materializer: LiveRowsMaterializer },
}
impl SubscriptionCallbackMode {
pub(crate) fn raw() -> Self {
Self::RawEvents
}
pub(crate) fn live_rows(config: LiveRowsConfig) -> Self {
Self::LiveRows {
materializer: LiveRowsMaterializer::new(config),
}
}
}
#[derive(Clone)]
pub(crate) struct SubscriptionState {
pub(crate) sql: String,
pub(crate) options: SubscriptionOptions,
pub(crate) callback: js_sys::Function,
pub(crate) last_seq_id: Option<SeqId>,
pub(crate) pending_subscribe_resolve: Option<js_sys::Function>,
pub(crate) pending_subscribe_reject: Option<js_sys::Function>,
pub(crate) awaiting_initial_response: bool,
pub(crate) callback_mode: SubscriptionCallbackMode,
}
#[derive(Debug, Clone, serde::Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub(crate) enum WasmLiveRowsEvent {
Rows {
subscription_id: String,
rows: Vec<crate::models::RowData>,
last_seq_id: Option<SeqId>,
},
Error {
subscription_id: String,
code: String,
message: String,
},
}
#[derive(Debug, Clone, Default, serde::Deserialize)]
pub(crate) struct WasmLiveRowsOptions {
pub(crate) limit: Option<usize>,
pub(crate) key_columns: Option<Vec<String>>,
pub(crate) subscription_options: Option<SubscriptionOptions>,
}
#[inline]
pub(crate) fn track_subscription_checkpoint(last_seq_id: &mut Option<SeqId>, event: &ChangeEvent) {
match event {
ChangeEvent::Ack { batch_control, .. } => {
if let Some(seq_id) = batch_control.last_seq_id {
seq_tracking::advance_seq(last_seq_id, seq_id);
}
},
ChangeEvent::InitialDataBatch {
rows,
batch_control,
..
} => {
if let Some(seq_id) = batch_control.last_seq_id {
seq_tracking::advance_seq(last_seq_id, seq_id);
}
seq_tracking::track_rows(last_seq_id, rows);
},
ChangeEvent::Insert { rows, .. } => {
seq_tracking::track_rows(last_seq_id, rows);
},
ChangeEvent::Update { rows, old_rows, .. } => {
seq_tracking::track_rows(last_seq_id, rows);
seq_tracking::track_rows(last_seq_id, old_rows);
},
ChangeEvent::Delete { old_rows, .. } => {
seq_tracking::track_rows(last_seq_id, old_rows);
},
ChangeEvent::Error { .. } | ChangeEvent::Unknown { .. } => {},
}
}
#[inline]
pub(crate) fn filter_subscription_event(
options: &SubscriptionOptions,
event: &ServerMessage,
) -> Option<ChangeEvent> {
let change_event = ChangeEvent::from_server_message(event.clone())?;
crate::subscription::filter_replayed_event(change_event, options.from)
}
#[inline]
pub(crate) fn callback_payload(
mode: &mut SubscriptionCallbackMode,
event: &ChangeEvent,
) -> Option<String> {
match mode {
SubscriptionCallbackMode::RawEvents => {
serde_json::to_string(&event.to_server_message()).ok()
},
SubscriptionCallbackMode::LiveRows { materializer } => {
let update = materializer.apply(event.clone())?;
let wasm_event = match update {
crate::subscription::LiveRowsEvent::Rows {
subscription_id,
rows,
last_seq_id,
} => WasmLiveRowsEvent::Rows {
subscription_id,
rows,
last_seq_id,
},
crate::subscription::LiveRowsEvent::Error {
subscription_id,
code,
message,
} => WasmLiveRowsEvent::Error {
subscription_id,
code,
message,
},
};
serde_json::to_string(&wasm_event).ok()
},
}
}