use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::OnceLock;
use actix_web::web::Data;
use serde_json::Value;
use sqlx::{Pool, Postgres};
use tokio::sync::{Mutex, MutexGuard, Notify, oneshot};
use tokio::time::Instant;
use crate::AppState;
use crate::api::cache::invalidation::invalidate_scoped_gateway_cache;
use crate::drivers::postgresql::sqlx_driver::insert_rows_bulk;
pub struct WindowInsertJob {
pub trace_id: String,
pub user_id: String,
pub company_id: String,
pub organization_id: String,
pub metadata_user_id: Option<String>,
pub metadata_company_id: Option<String>,
pub metadata_organization_id: Option<String>,
pub body: Value,
pub table_name: String,
pub insert_body: Value,
pub resource_id_key: String,
pub client_name: String,
pub merge_eligible: bool,
pub logged_request_id: String,
pub logged_client_name: String,
pub logged_method: String,
pub logged_path: String,
pub operation_start: std::time::Instant,
pub verbose_logging: bool,
pub ansi_enabled: bool,
pub x_publish_event: bool,
pub resolved_company_for_event: Option<String>,
pub due: tokio::time::Instant,
}
#[derive(Clone, Eq, PartialEq, Hash, Debug)]
struct MergeLaneKey {
client_name: String,
table_name: String,
column_fingerprint: String,
merge_eligible: bool,
}
impl MergeLaneKey {
fn for_job(job: &WindowInsertJob, deny_tables: &HashSet<String>) -> Self {
let merge_eligible = job.merge_eligible
&& !super::insert_request_has_update_body(&job.body)
&& !table_in_denylist(&job.table_name, deny_tables);
Self {
client_name: job.client_name.clone(),
table_name: job.table_name.clone(),
column_fingerprint: insert_body_fingerprint(&job.insert_body),
merge_eligible,
}
}
}
fn insert_body_fingerprint(body: &Value) -> String {
let Some(obj) = body.as_object() else {
return String::new();
};
let mut keys: Vec<&str> = obj.keys().map(|s| s.as_str()).collect();
keys.sort_unstable();
keys.join("\u{001f}")
}
fn table_in_denylist(table: &str, deny: &HashSet<String>) -> bool {
if deny.is_empty() {
return false;
}
let t = table.trim();
deny.contains(t)
|| t.rsplit_once('.')
.is_some_and(|(_, short)| deny.contains(short))
}
struct Queued {
lane: MergeLaneKey,
job: WindowInsertJob,
response_tx: oneshot::Sender<super::WindowInsertOutcome>,
}
#[derive(Clone)]
pub struct InsertWindowSettings {
pub max_batch: usize,
pub max_queued: usize,
pub deny_tables: HashSet<String>,
}
struct Inner {
settings: InsertWindowSettings,
state: Mutex<State>,
notify: Notify,
app_state: OnceLock<Data<AppState>>,
}
struct State {
pending: Vec<Queued>,
}
pub struct InsertWindowCoordinator {
inner: Arc<Inner>,
}
fn normalize_settings(settings: InsertWindowSettings) -> InsertWindowSettings {
let max_batch = settings.max_batch.clamp(1, 10_000);
let max_queued = settings
.max_queued
.clamp(max_batch.saturating_mul(2), 1_000_000);
InsertWindowSettings {
max_batch,
max_queued,
deny_tables: settings.deny_tables,
}
}
fn adaptive_chunk_len(max_batch: usize, remaining: usize) -> usize {
let safe_max_batch = max_batch.max(1);
if remaining >= safe_max_batch.saturating_mul(4) {
return safe_max_batch;
}
if remaining >= safe_max_batch.saturating_mul(2) {
return (safe_max_batch / 2).max(1);
}
safe_max_batch.min(remaining)
}
impl InsertWindowCoordinator {
pub fn new(settings: InsertWindowSettings) -> Arc<Self> {
let settings = normalize_settings(settings);
let inner: Arc<Inner> = Arc::new(Inner {
settings,
state: Mutex::new(State {
pending: Vec::new(),
}),
notify: Notify::new(),
app_state: OnceLock::new(),
});
Arc::new(Self { inner })
}
pub fn bind_app_state(&self, app_state: Data<AppState>) {
if self.inner.app_state.set(app_state).is_err() {
return;
}
let inner: Arc<Inner> = self.inner.clone();
tokio::spawn(worker_loop(inner));
}
pub(crate) async fn submit(
&self,
job: WindowInsertJob,
response_tx: oneshot::Sender<super::WindowInsertOutcome>,
) {
let lane: MergeLaneKey = MergeLaneKey::for_job(&job, &self.inner.settings.deny_tables);
let app_opt: Option<Data<AppState>> = self.inner.app_state.get().cloned();
{
let mut st: MutexGuard<'_, State> = self.inner.state.lock().await;
if st.pending.len() < self.inner.settings.max_queued {
st.pending.push(Queued {
lane,
job,
response_tx,
});
let depth = st.pending.len();
drop(st);
self.inner.notify.notify_one();
if let Some(app) = &app_opt {
app.metrics_state
.record_gateway_insert_window_row_count("rows_submitted", 1);
app.metrics_state
.record_gateway_insert_window_queue_depth(depth as u64);
}
return;
}
}
let Some(app) = app_opt else {
let _ = response_tx.send(super::window_insert_internal_error(
"Insert window coordinator is not initialized",
));
return;
};
app.metrics_state
.record_gateway_insert_window_event("admission_reject_queue_full");
let _ = response_tx.send(super::window_insert_overloaded_outcome(
&job,
"Insert queue is full; request rejected to protect database capacity",
"window_queue_full",
));
}
pub(crate) async fn pending_queue_len(&self) -> usize {
let st: MutexGuard<'_, State> = self.inner.state.lock().await;
st.pending.len()
}
pub(crate) fn max_queued(&self) -> usize {
self.inner.settings.max_queued
}
}
async fn worker_loop(inner: Arc<Inner>) {
loop {
let Some(app) = inner.app_state.get().cloned() else {
tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
continue;
};
let next_deadline: Option<Instant> = {
let st: MutexGuard<'_, State> = inner.state.lock().await;
st.pending.iter().map(|q| q.job.due).min()
};
if next_deadline.is_none() {
inner.notify.notified().await;
continue;
}
tokio::select! {
_ = tokio::time::sleep_until(next_deadline.unwrap()) => {}
_ = inner.notify.notified() => { continue; }
}
let due_batch: Vec<Queued> = {
let mut st: MutexGuard<'_, State> = inner.state.lock().await;
let now: Instant = Instant::now();
let mut remain: Vec<Queued> = Vec::new();
let mut ready: Vec<Queued> = Vec::new();
for q in st.pending.drain(..) {
if q.job.due <= now {
ready.push(q);
} else {
remain.push(q);
}
}
st.pending = remain;
ready
};
if due_batch.is_empty() {
continue;
}
app.metrics_state
.record_gateway_insert_window_queue_depth(due_batch.len() as u64);
let mut by_lane: HashMap<MergeLaneKey, Vec<Queued>> = HashMap::new();
for q in due_batch {
by_lane.entry(q.lane.clone()).or_default().push(q);
}
let settings: InsertWindowSettings = inner.settings.clone();
for (lane, group) in by_lane {
process_lane_group(app.clone(), lane, group, &settings).await;
}
}
}
async fn process_lane_group(
app: Data<AppState>,
lane: MergeLaneKey,
mut group: Vec<Queued>,
settings: &InsertWindowSettings,
) {
let batch_started = std::time::Instant::now();
let group_size = group.len();
if group.len() == 1 || !lane.merge_eligible {
app.metrics_state
.record_gateway_insert_window_event("flush_single");
app.metrics_state
.record_gateway_insert_window_row_count("rows_path_single", group.len() as u64);
for q in group.drain(..) {
let outcome: super::WindowInsertOutcome =
super::run_postgres_insert_to_outcome(app.clone(), q.job).await;
let _ = q.response_tx.send(outcome);
}
return;
}
let pool_opt: Option<Pool<Postgres>> = app.pg_registry.get_pool(&lane.client_name);
let Some(pool) = pool_opt else {
app.metrics_state
.record_gateway_insert_window_event("flush_single_no_pool");
app.metrics_state
.record_gateway_insert_window_row_count("rows_path_single", group.len() as u64);
for q in group.drain(..) {
let outcome: super::WindowInsertOutcome =
super::run_postgres_insert_to_outcome(app.clone(), q.job).await;
let _ = q.response_tx.send(outcome);
}
return;
};
while !group.is_empty() {
let chunk_len: usize = adaptive_chunk_len(settings.max_batch, group.len());
if chunk_len < settings.max_batch {
app.metrics_state
.record_gateway_insert_window_event("adaptive_chunk_reduced");
}
let chunk: Vec<Queued> = group.drain(..chunk_len).collect();
for queued in &chunk {
app.metrics_state.record_gateway_insert_phase_duration(
"queue_wait",
queued.job.operation_start.elapsed().as_secs_f64(),
);
}
let dedupe_enabled: bool = super::insert_window_dedupe_enabled();
let mut seen_signatures: HashSet<String> = HashSet::new();
let mut filtered_chunk: Vec<Queued> = Vec::with_capacity(chunk.len());
let dedupe_started = std::time::Instant::now();
for q in chunk {
let mut queued: Option<Queued> = Some(q);
let mut should_skip: bool = false;
if let Some(signature) = super::build_insert_duplicate_signature(
&lane.client_name,
&lane.table_name,
&queued
.as_ref()
.expect("queued item must exist")
.job
.insert_body,
) {
if let Some(cached_constraint) = super::lookup_recent_unique_violation(&signature) {
app.metrics_state
.record_gateway_insert_window_event("gateway_recent_conflict_cache_hit");
if let Some(queued) = queued.take() {
let _ = queued.response_tx.send(
super::window_prefilter_unique_violation_outcome(
&queued.job,
cached_constraint.as_deref(),
"recent_unique_violation_cache_window",
),
);
}
should_skip = true;
} else if dedupe_enabled && !seen_signatures.insert(signature) {
app.metrics_state
.record_gateway_insert_window_event("gateway_window_dedupe");
if let Some(queued) = queued.take() {
let _ = queued.response_tx.send(
super::window_prefilter_unique_violation_outcome(
&queued.job,
None,
"window_chunk_duplicate",
),
);
}
should_skip = true;
}
}
if !should_skip && let Some(queued) = queued {
filtered_chunk.push(queued);
}
}
app.metrics_state.record_gateway_insert_phase_duration(
"dedupe_check",
dedupe_started.elapsed().as_secs_f64(),
);
if filtered_chunk.is_empty() {
continue;
}
let payloads: Vec<Value> = filtered_chunk
.iter()
.map(|q| q.job.insert_body.clone())
.collect();
let db_insert_started = std::time::Instant::now();
let deduped_count = (chunk_len as u64).saturating_sub(filtered_chunk.len() as u64);
if deduped_count > 0 {
app.metrics_state
.record_gateway_insert_window_row_count("rows_deduped", deduped_count);
}
app.metrics_state
.record_gateway_insert_window_event("db_calls_bulk");
app.metrics_state
.record_gateway_insert_window_batch_size(filtered_chunk.len() as u64);
match insert_rows_bulk(&pool, &lane.table_name, &payloads).await {
Ok(rows) => {
app.metrics_state.record_gateway_insert_phase_duration(
"db_insert",
db_insert_started.elapsed().as_secs_f64(),
);
if rows.len() != filtered_chunk.len() {
app.metrics_state
.record_gateway_insert_window_event("bulk_row_count_mismatch");
app.metrics_state.record_gateway_insert_window_row_count(
"rows_path_single",
filtered_chunk.len() as u64,
);
for q in filtered_chunk {
let outcome: super::WindowInsertOutcome =
super::run_postgres_insert_to_outcome(app.clone(), q.job).await;
let _ = q.response_tx.send(outcome);
}
continue;
}
app.metrics_state
.record_gateway_insert_window_event("flush_bulk");
app.metrics_state
.record_gateway_insert_window_event("db_calls_bulk_ok");
app.metrics_state
.record_gateway_insert_window_row_count("rows_path_bulk", rows.len() as u64);
let mut any_invalidate: bool = false;
for row in &rows {
if super::should_invalidate_cache_after_insert(row) {
any_invalidate = true;
break;
}
}
if any_invalidate {
let _ = invalidate_scoped_gateway_cache(
app.clone(),
&lane.client_name,
&lane.table_name,
)
.await;
}
app.metrics_state
.record_gateway_postgres_backend("/gateway/insert", "sqlx");
for (q, row) in filtered_chunk.into_iter().zip(rows.into_iter()) {
let outcome = super::finish_postgres_insert_success_json_with_recovery(
app.clone(),
&q.job,
row,
)
.await;
let _ = q.response_tx.send(outcome);
}
}
Err(_err) => {
app.metrics_state.record_gateway_insert_phase_duration(
"db_insert",
db_insert_started.elapsed().as_secs_f64(),
);
app.metrics_state
.record_gateway_insert_window_event("bulk_fallback_sql_error");
app.metrics_state
.record_gateway_insert_window_event("db_calls_bulk_err");
app.metrics_state.record_gateway_insert_window_row_count(
"rows_path_single",
filtered_chunk.len() as u64,
);
for q in filtered_chunk {
let outcome: super::WindowInsertOutcome =
super::run_postgres_insert_to_outcome(app.clone(), q.job).await;
let _ = q.response_tx.send(outcome);
}
}
}
}
let batch_duration = batch_started.elapsed();
if batch_duration.as_millis() > 10000 {
tracing::warn!(
lane_client = %lane.client_name,
lane_table = %lane.table_name,
group_size = group_size,
batch_duration_ms = batch_duration.as_millis(),
"Slow batch processing (>10s) - possible audit log, webhook, or cache invalidation delay"
);
} else if batch_duration.as_millis() > 5000 {
tracing::info!(
lane_client = %lane.client_name,
lane_table = %lane.table_name,
group_size = group_size,
batch_duration_ms = batch_duration.as_millis(),
"Batch processing took >5s"
);
}
}
#[cfg(test)]
mod tests {
use super::{
InsertWindowSettings, adaptive_chunk_len, insert_body_fingerprint, normalize_settings,
table_in_denylist,
};
use serde_json::{Value, json};
use std::collections::HashSet;
#[test]
fn insert_body_fingerprint_sorts_object_keys() {
let body: Value = json!({ "b": 1, "a": 2 });
assert_eq!(insert_body_fingerprint(&body), "a\u{001f}b");
}
#[test]
fn insert_body_fingerprint_non_object_empty() {
assert_eq!(insert_body_fingerprint(&json!([])), "");
}
#[test]
fn denylist_matches_short_name_and_schema_qualified() {
let mut deny: HashSet<String> = HashSet::new();
deny.insert("items".to_string());
assert!(table_in_denylist("items", &deny));
assert!(table_in_denylist("public.items", &deny));
assert!(!table_in_denylist("other", &deny));
}
#[test]
fn normalize_settings_clamps_batch_and_queue_limits() {
let settings = InsertWindowSettings {
max_batch: 0,
max_queued: 1,
deny_tables: HashSet::new(),
};
let normalized = normalize_settings(settings);
assert_eq!(normalized.max_batch, 1);
assert_eq!(normalized.max_queued, 2);
let settings = InsertWindowSettings {
max_batch: 50_000,
max_queued: 2_000_000,
deny_tables: HashSet::new(),
};
let normalized = normalize_settings(settings);
assert_eq!(normalized.max_batch, 10_000);
assert_eq!(normalized.max_queued, 1_000_000);
}
#[test]
fn adaptive_chunk_len_reduces_batch_under_moderate_backlog() {
assert_eq!(adaptive_chunk_len(100, 500), 100);
assert_eq!(adaptive_chunk_len(100, 250), 50);
assert_eq!(adaptive_chunk_len(100, 120), 100);
assert_eq!(adaptive_chunk_len(100, 40), 40);
assert_eq!(adaptive_chunk_len(0, 5), 1);
}
}