use std::{
collections::HashMap,
time::{SystemTime, UNIX_EPOCH},
};
use tokio::{
sync::{mpsc, oneshot},
time::Instant as TokioInstant,
};
use crate::{
connection::FAR_FUTURE,
error::Result,
models::{ChangeEvent, SubscriptionInfo, SubscriptionOptions},
seq_tracking,
subscription::final_resume_seq,
timeouts::KalamLinkTimeouts,
SeqId,
};
pub(super) type SubscriptionReady = Result<(u64, Option<SeqId>)>;
type SubscriptionReadySender = oneshot::Sender<SubscriptionReady>;
#[inline]
pub(super) fn now_ms() -> u64 {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_millis() as u64
}
pub(super) fn snapshot_subscriptions(
subs: &HashMap<String, SubEntry>,
seq_id_cache: &HashMap<String, SeqId>,
) -> Vec<SubscriptionInfo> {
let mut out: Vec<SubscriptionInfo> = subs
.iter()
.map(|(id, entry)| SubscriptionInfo {
id: id.clone(),
query: entry.sql.clone(),
last_seq_id: effective_entry_seq(entry),
last_event_time_ms: entry.last_event_time_ms,
created_at_ms: entry.created_at_ms,
closed: false,
})
.collect();
for (id, &seq) in seq_id_cache {
if !subs.contains_key(id) {
out.push(SubscriptionInfo {
id: id.clone(),
query: String::new(),
last_seq_id: Some(seq),
last_event_time_ms: None,
created_at_ms: 0,
closed: true,
});
}
}
out
}
pub(super) fn effective_entry_seq(entry: &SubEntry) -> Option<SeqId> {
final_resume_seq(entry.last_seq_id, entry.consumed_seq_id)
}
pub(super) fn cache_entry_seq(
seq_id_cache: &mut HashMap<String, SeqId>,
id: impl Into<String>,
entry: &SubEntry,
) {
if let Some(seq) = effective_entry_seq(entry) {
seq_id_cache.insert(id.into(), seq);
}
}
pub(super) fn merge_resume_from(
options: &mut SubscriptionOptions,
inherited_seq: Option<SeqId>,
) -> Option<SeqId> {
let effective_from = match (options.from, inherited_seq) {
(Some(explicit), Some(cached)) => Some(explicit.max(cached)),
(explicit, cached) => explicit.or(cached),
};
options.from = effective_from;
effective_from
}
pub(super) fn should_send_subscription_options(
request_initial_data: bool,
options: &SubscriptionOptions,
) -> bool {
request_initial_data
|| options.batch_size.is_some()
|| options.last_rows.is_some()
|| options.from.is_some()
}
#[allow(clippy::too_many_arguments)]
pub(super) fn register_subscription_entry(
subs: &mut HashMap<String, SubEntry>,
seq_id_cache: &mut HashMap<String, SeqId>,
next_generation: &mut u64,
timeouts: &KalamLinkTimeouts,
id: String,
sql: String,
mut options: SubscriptionOptions,
request_initial_data: bool,
event_tx: mpsc::Sender<Result<ChangeEvent>>,
result_tx: SubscriptionReadySender,
) -> (u64, Option<SeqId>) {
let effective_from = merge_resume_from(&mut options, seq_id_cache.remove(&id));
let generation = *next_generation;
*next_generation += 1;
subs.insert(
id,
SubEntry {
sql,
options,
request_initial_data,
event_tx,
last_seq_id: effective_from,
consumed_seq_id: effective_from,
batch_seq_id: None,
is_loading: true,
generation,
created_at_ms: now_ms(),
last_event_time_ms: None,
pending_result_tx: Some(result_tx),
ready_deadline: startup_deadline(timeouts),
reconnect_resubscribe_pending: false,
},
);
(generation, effective_from)
}
pub(super) fn remove_subscription_entry(
subs: &mut HashMap<String, SubEntry>,
seq_id_cache: &mut HashMap<String, SeqId>,
id: &str,
generation: Option<u64>,
) -> Option<SubEntry> {
let should_remove = match generation {
Some(expected_generation) => {
subs.get(id).is_some_and(|entry| entry.generation == expected_generation)
},
None => true,
};
if !should_remove {
return None;
}
subs.remove(id).inspect(|entry| {
cache_entry_seq(seq_id_cache, id.to_string(), entry);
})
}
pub(super) fn advance_entry_progress(
entry: &mut SubEntry,
generation: u64,
seq_id: SeqId,
advance_resume: bool,
) {
if entry.generation != generation {
return;
}
seq_tracking::advance_seq(&mut entry.consumed_seq_id, seq_id);
if advance_resume {
seq_tracking::advance_seq(&mut entry.last_seq_id, seq_id);
}
entry.last_event_time_ms = Some(now_ms());
}
pub(super) fn startup_deadline(timeouts: &KalamLinkTimeouts) -> Option<TokioInstant> {
if KalamLinkTimeouts::is_no_timeout(timeouts.initial_data_timeout) {
None
} else {
Some(TokioInstant::now() + timeouts.initial_data_timeout)
}
}
pub(super) fn resume_startup_deadline(timeouts: &KalamLinkTimeouts) -> Option<TokioInstant> {
if KalamLinkTimeouts::is_no_timeout(timeouts.initial_data_timeout) {
return None;
}
let timeout = if KalamLinkTimeouts::is_no_timeout(timeouts.subscribe_timeout) {
timeouts.initial_data_timeout
} else {
timeouts.initial_data_timeout.min(timeouts.subscribe_timeout)
};
Some(TokioInstant::now() + timeout)
}
pub(super) fn reset_startup_deadline(
entry: &mut SubEntry,
timeouts: &KalamLinkTimeouts,
is_resume: bool,
) {
entry.ready_deadline = if is_resume {
resume_startup_deadline(timeouts)
} else {
startup_deadline(timeouts)
};
entry.reconnect_resubscribe_pending = is_resume;
}
pub(super) fn refresh_startup_deadline(entry: &mut SubEntry, timeouts: &KalamLinkTimeouts) {
if entry.ready_deadline.is_some() {
entry.ready_deadline = startup_deadline(timeouts);
}
}
pub(super) fn clear_startup_deadline(entry: &mut SubEntry) {
entry.ready_deadline = None;
entry.reconnect_resubscribe_pending = false;
}
pub(super) fn next_startup_deadline(subs: &HashMap<String, SubEntry>) -> TokioInstant {
subs.values()
.filter_map(|entry| entry.ready_deadline)
.min()
.unwrap_or_else(|| TokioInstant::now() + FAR_FUTURE)
}
pub(super) enum SubscriptionKeyMatch {
Direct,
Fallback(String),
}
impl SubscriptionKeyMatch {
#[inline]
pub(super) fn as_str<'a>(&'a self, incoming_sub_id: &'a str) -> &'a str {
match self {
Self::Direct => incoming_sub_id,
Self::Fallback(key) => key.as_str(),
}
}
}
pub(super) fn resolve_subscription_key(
sub_id: &str,
subs: &HashMap<String, SubEntry>,
) -> Option<SubscriptionKeyMatch> {
if subs.contains_key(sub_id) {
Some(SubscriptionKeyMatch::Direct)
} else {
let mut suffix_matches =
subs.keys().filter(|client_id| sub_id.ends_with(client_id.as_str()));
let first_match = suffix_matches.next()?.clone();
if suffix_matches.next().is_some() {
return None;
}
Some(SubscriptionKeyMatch::Fallback(first_match))
}
}
pub(super) enum ConnCmd {
Subscribe {
id: String,
sql: String,
options: SubscriptionOptions,
request_initial_data: bool,
event_tx: mpsc::Sender<Result<ChangeEvent>>,
result_tx: SubscriptionReadySender,
},
Unsubscribe {
id: String,
generation: Option<u64>,
},
Progress {
id: String,
generation: u64,
seq_id: SeqId,
advance_resume: bool,
},
ListSubscriptions {
result_tx: oneshot::Sender<Vec<SubscriptionInfo>>,
},
Shutdown,
}
pub(super) struct SubEntry {
pub(super) sql: String,
pub(super) options: SubscriptionOptions,
pub(super) request_initial_data: bool,
pub(super) event_tx: mpsc::Sender<Result<ChangeEvent>>,
pub(super) last_seq_id: Option<SeqId>,
pub(super) consumed_seq_id: Option<SeqId>,
pub(super) batch_seq_id: Option<SeqId>,
pub(super) is_loading: bool,
pub(super) generation: u64,
pub(super) created_at_ms: u64,
pub(super) last_event_time_ms: Option<u64>,
pub(super) pending_result_tx: Option<SubscriptionReadySender>,
pub(super) ready_deadline: Option<TokioInstant>,
pub(super) reconnect_resubscribe_pending: bool,
}