use std::time::Duration;
use crate::db::DbPool;
use crate::error::AppResult;
use crate::nats::EventStreamPublisher;
use crate::state::AppState;
const CURSOR_NAME: &str = "event_stream_tailer";
#[derive(Debug, Clone)]
pub struct EventStreamConfig {
pub enabled: bool,
pub batch_size: i64,
pub poll_interval: Duration,
pub backfill: bool,
pub dedup_window: Duration,
pub max_age: Duration,
}
impl EventStreamConfig {
pub fn from_env() -> Self {
Self::from_lookup(|k| std::env::var(k).ok())
}
pub fn from_lookup(lookup: impl Fn(&str) -> Option<String>) -> Self {
let flag = |key: &str| {
lookup(key)
.map(|v| {
let v = v.trim().to_ascii_lowercase();
v == "true" || v == "1" || v == "yes" || v == "on"
})
.unwrap_or(false)
};
let num = |key: &str, default: u64| {
lookup(key)
.and_then(|v| v.trim().parse().ok())
.unwrap_or(default)
};
Self {
enabled: flag("NOETL_EVENT_STREAM_ENABLED"),
batch_size: num("NOETL_EVENT_STREAM_BATCH", 500) as i64,
poll_interval: Duration::from_millis(num("NOETL_EVENT_STREAM_POLL_MS", 500)),
backfill: flag("NOETL_EVENT_STREAM_BACKFILL"),
dedup_window: Duration::from_secs(num("NOETL_EVENT_STREAM_DEDUP_SECS", 120)),
max_age: Duration::from_secs(num("NOETL_EVENT_STREAM_RETENTION_SECS", 86_400)),
}
}
}
pub async fn ensure_cursor_table(pool: &DbPool) -> AppResult<()> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS noetl.stream_cursor (
name TEXT PRIMARY KEY,
position BIGINT NOT NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
)
"#,
)
.execute(pool)
.await?;
Ok(())
}
async fn load_or_init_cursor(pool: &DbPool, backfill: bool) -> AppResult<i64> {
if let Some((pos,)) =
sqlx::query_as::<_, (i64,)>("SELECT position FROM noetl.stream_cursor WHERE name = $1")
.bind(CURSOR_NAME)
.fetch_optional(pool)
.await?
{
return Ok(pos);
}
let start: i64 = if backfill {
0
} else {
sqlx::query_as::<_, (Option<i64>,)>("SELECT MAX(event_id) FROM noetl.event")
.fetch_one(pool)
.await?
.0
.unwrap_or(0)
};
save_cursor(pool, start).await?;
Ok(start)
}
async fn save_cursor(pool: &DbPool, position: i64) -> AppResult<()> {
sqlx::query(
r#"
INSERT INTO noetl.stream_cursor (name, position, updated_at)
VALUES ($1, $2, now())
ON CONFLICT (name) DO UPDATE SET position = EXCLUDED.position, updated_at = now()
"#,
)
.bind(CURSOR_NAME)
.bind(position)
.execute(pool)
.await?;
Ok(())
}
#[derive(sqlx::FromRow)]
struct TailRow {
event_id: i64,
event_type: String,
payload: serde_json::Value,
}
pub fn spawn_event_stream_tailer(state: AppState, config: EventStreamConfig) {
if !config.enabled {
tracing::info!(
target: "noetl_server::startup",
"event-stream tailer disabled (NOETL_EVENT_STREAM_ENABLED unset) — CQRS write path inert"
);
return;
}
let Some(client) = state.nats.clone() else {
tracing::warn!(
target: "noetl_server::startup",
"event-stream tailer enabled but NATS is not connected — producer cannot run"
);
return;
};
tokio::spawn(async move {
let publisher =
match EventStreamPublisher::new(client, config.dedup_window, config.max_age).await {
Ok(p) => p,
Err(e) => {
tracing::error!(%e, "event-stream tailer: failed to ensure noetl_events stream; producer not started");
return;
}
};
let pool = &state.db;
if let Err(e) = ensure_cursor_table(pool).await {
tracing::error!(%e, "event-stream tailer: failed to ensure cursor table; producer not started");
return;
}
let mut cursor = match load_or_init_cursor(pool, config.backfill).await {
Ok(c) => c,
Err(e) => {
tracing::error!(%e, "event-stream tailer: failed to load cursor; producer not started");
return;
}
};
tracing::info!(
target: "noetl_server::startup",
start_cursor = cursor,
batch = config.batch_size,
"event-stream tailer started (CQRS write-path producer, #103 phase 2a)"
);
loop {
match publish_batch(pool, &publisher, cursor, config.batch_size).await {
Ok(Some(new_cursor)) => {
cursor = new_cursor;
if let Err(e) = save_cursor(pool, cursor).await {
tracing::warn!(%e, cursor, "event-stream tailer: cursor persist failed; will retry");
}
continue;
}
Ok(None) => {} Err(e) => {
tracing::warn!(%e, cursor, "event-stream tailer: batch publish failed; backing off");
}
}
tokio::time::sleep(config.poll_interval).await;
}
});
}
async fn publish_batch(
pool: &DbPool,
publisher: &EventStreamPublisher,
cursor: i64,
batch_size: i64,
) -> AppResult<Option<i64>> {
let rows: Vec<TailRow> = sqlx::query_as(
r#"
SELECT event_id, COALESCE(event_type, 'unknown') AS event_type, to_jsonb(e) AS payload
FROM noetl.event e
WHERE event_id > $1
ORDER BY event_id ASC
LIMIT $2
"#,
)
.bind(cursor)
.bind(batch_size)
.fetch_all(pool)
.await?;
if rows.is_empty() {
return Ok(None);
}
let mut max_id = cursor;
for row in &rows {
let bytes = serde_json::to_vec(&row.payload).map_err(|e| {
crate::error::AppError::Internal(format!("event payload encode: {e}"))
})?;
if bytes.len() > MAX_EVENT_PAYLOAD_BYTES {
tracing::warn!(
event_id = row.event_id,
event_type = %row.event_type,
bytes = bytes.len(),
"event-stream tailer: payload over max; skipping (recoverable from noetl.event during dual-write)"
);
crate::metrics::record_event_stream_skipped(&row.event_type);
max_id = max_id.max(row.event_id);
continue;
}
publisher
.publish_event(row.event_id, &row.event_type, &bytes)
.await
.map_err(|e| crate::error::AppError::Internal(format!("event publish: {e}")))?;
crate::metrics::record_event_stream_published(&row.event_type, 1, row.event_id);
max_id = max_id.max(row.event_id);
}
Ok(Some(max_id))
}
const MAX_EVENT_PAYLOAD_BYTES: usize = 900 * 1024;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn config_defaults_are_safe() {
let c = EventStreamConfig::from_lookup(|_| None);
assert!(!c.enabled, "default must be off");
assert_eq!(c.batch_size, 500);
assert_eq!(c.poll_interval, Duration::from_millis(500));
assert!(!c.backfill);
assert_eq!(c.dedup_window, Duration::from_secs(120));
}
#[test]
fn config_parses_overrides() {
let map = |k: &str| -> Option<String> {
match k {
"NOETL_EVENT_STREAM_ENABLED" => Some("yes".into()),
"NOETL_EVENT_STREAM_BATCH" => Some("1000".into()),
"NOETL_EVENT_STREAM_POLL_MS" => Some("250".into()),
"NOETL_EVENT_STREAM_BACKFILL" => Some("true".into()),
_ => None,
}
};
let c = EventStreamConfig::from_lookup(map);
assert!(c.enabled);
assert_eq!(c.batch_size, 1000);
assert_eq!(c.poll_interval, Duration::from_millis(250));
assert!(c.backfill);
}
}