#![allow(dead_code)]
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::Stream;
use sqlx::SqlitePool;
use tokio::sync::{broadcast, mpsc};
use ff_core::engine_error::EngineError;
use crate::pubsub::OutboxEvent;
const STREAM_CAPACITY: usize = 1024;
pub(crate) type RowDecoder<T> =
Box<dyn Fn(&sqlx::sqlite::SqliteRow) -> Result<Option<T>, EngineError> + Send + Sync>;
pub(crate) struct OutboxCursorConfig<T> {
pub pool: SqlitePool,
pub select_sql: &'static str,
pub partition_key: i64,
pub cursor: i64,
pub batch_size: i64,
pub wakeup: broadcast::Receiver<OutboxEvent>,
pub decoder: RowDecoder<T>,
pub row_event_id: fn(&sqlx::sqlite::SqliteRow) -> Result<i64, EngineError>,
}
pub(crate) struct OutboxCursorStream<T> {
rx: mpsc::Receiver<Result<T, EngineError>>,
}
impl<T> Stream for OutboxCursorStream<T> {
type Item = Result<T, EngineError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.rx.poll_recv(cx)
}
}
pub(crate) fn spawn<T>(config: OutboxCursorConfig<T>) -> OutboxCursorStream<T>
where
T: Send + 'static,
{
let (tx, rx) = mpsc::channel::<Result<T, EngineError>>(STREAM_CAPACITY);
tokio::spawn(reader_loop(config, tx));
OutboxCursorStream { rx }
}
async fn reader_loop<T>(mut config: OutboxCursorConfig<T>, tx: mpsc::Sender<Result<T, EngineError>>)
where
T: Send + 'static,
{
if !replay(&mut config, &tx).await {
return;
}
loop {
if tx.is_closed() {
return;
}
let wake = config.wakeup.recv().await;
match wake {
Ok(_ev) => {
if !replay(&mut config, &tx).await {
return;
}
}
Err(broadcast::error::RecvError::Lagged(skipped)) => {
tracing::debug!(
table.sql = config.select_sql,
skipped = skipped,
"sqlite.outbox_cursor: broadcast lagged; falling back to cursor-select"
);
if !replay(&mut config, &tx).await {
return;
}
}
Err(broadcast::error::RecvError::Closed) => {
let _ = replay(&mut config, &tx).await;
return;
}
}
}
}
async fn replay<T>(
config: &mut OutboxCursorConfig<T>,
tx: &mpsc::Sender<Result<T, EngineError>>,
) -> bool {
loop {
let rows = match sqlx::query(config.select_sql)
.bind(config.partition_key)
.bind(config.cursor)
.bind(config.batch_size)
.fetch_all(&config.pool)
.await
{
Ok(rows) => rows,
Err(e) => {
let _ = tx.send(Err(crate::errors::map_sqlx_error(e))).await;
return false;
}
};
if rows.is_empty() {
return !tx.is_closed();
}
for row in &rows {
let event_id = match (config.row_event_id)(row) {
Ok(id) => id,
Err(e) => {
let _ = tx.send(Err(e)).await;
return false;
}
};
config.cursor = event_id;
match (config.decoder)(row) {
Ok(Some(item)) => {
if tx.send(Ok(item)).await.is_err() {
return false; }
}
Ok(None) => {
continue;
}
Err(e) => {
let _ = tx.send(Err(e)).await;
return false;
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::SqliteBackend;
use crate::queries::stream as q_stream;
use ff_core::backend::{CapabilitySet, ClaimPolicy, Frame, FrameKind};
use ff_core::engine_backend::EngineBackend;
use ff_core::types::{ExecutionId, LaneId, WorkerId, WorkerInstanceId};
use serial_test::serial;
use sqlx::Row;
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
async fn fresh_backend() -> Arc<SqliteBackend> {
unsafe {
std::env::set_var("FF_DEV_MODE", "1");
}
use std::time::{SystemTime, UNIX_EPOCH};
let ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let tid = std::thread::current().id();
let tag = format!("{ns}-{tid:?}").replace([':', ' '], "-");
let uri = format!("file:rfc-023-outbox-cursor-{tag}?mode=memory&cache=shared");
SqliteBackend::new(&uri).await.expect("construct backend")
}
async fn seed_and_claim(backend: &SqliteBackend) -> (ExecutionId, ff_core::backend::Handle) {
let pool = backend.pool_for_test();
let exec_uuid = Uuid::new_v4();
let exec_id = ExecutionId::parse(&format!("{{fp:0}}:{exec_uuid}")).expect("exec id");
sqlx::query(
r#"
INSERT INTO ff_exec_core (
partition_key, execution_id, lane_id, attempt_index,
lifecycle_phase, ownership_state, eligibility_state,
public_state, attempt_state, priority, created_at_ms
) VALUES (0, ?1, 'default', 0,
'runnable', 'unowned', 'eligible_now',
'pending', 'initial', 0, 1)
"#,
)
.bind(exec_uuid)
.execute(pool)
.await
.expect("seed exec_core");
let caps = CapabilitySet::new::<_, &str>([]);
let h = backend
.claim(
&LaneId::new("default"),
&caps,
ClaimPolicy::new(
WorkerId::new("w"),
WorkerInstanceId::new("wi"),
30_000,
None,
),
)
.await
.expect("claim")
.expect("handle");
(exec_id, h)
}
#[derive(Debug, Clone)]
struct TestFrameEvent {
event_id: i64,
ts_ms: i64,
seq: i64,
payload: String,
}
fn test_decoder() -> RowDecoder<TestFrameEvent> {
Box::new(
|row: &sqlx::sqlite::SqliteRow| -> Result<Option<TestFrameEvent>, EngineError> {
let event_id: i64 =
row.try_get("event_id")
.map_err(|e| EngineError::Validation {
kind: ff_core::engine_error::ValidationKind::Corruption,
detail: format!("event_id: {e}"),
})?;
let ts_ms: i64 = row.try_get("ts_ms").map_err(|e| EngineError::Validation {
kind: ff_core::engine_error::ValidationKind::Corruption,
detail: format!("ts_ms: {e}"),
})?;
let seq: i64 = row.try_get("seq").map_err(|e| EngineError::Validation {
kind: ff_core::engine_error::ValidationKind::Corruption,
detail: format!("seq: {e}"),
})?;
let fields_text: String = row.try_get("fields").unwrap_or_default();
let payload: String = serde_json::from_str::<serde_json::Value>(&fields_text)
.ok()
.and_then(|v| {
v.get("payload")
.and_then(|p| p.as_str().map(ToOwned::to_owned))
})
.unwrap_or_default();
Ok(Some(TestFrameEvent {
event_id,
ts_ms,
seq,
payload,
}))
},
)
}
fn test_row_event_id(row: &sqlx::sqlite::SqliteRow) -> Result<i64, EngineError> {
row.try_get("event_id")
.map_err(|e| EngineError::Validation {
kind: ff_core::engine_error::ValidationKind::Corruption,
detail: format!("event_id: {e}"),
})
}
async fn drain_n(
mut stream: OutboxCursorStream<TestFrameEvent>,
n: usize,
timeout: Duration,
) -> Vec<TestFrameEvent> {
use futures_core::stream::Stream as _;
use std::future::poll_fn;
use std::pin::Pin;
let mut out = Vec::with_capacity(n);
let deadline = tokio::time::Instant::now() + timeout;
while out.len() < n {
let remaining = deadline
.checked_duration_since(tokio::time::Instant::now())
.unwrap_or(Duration::ZERO);
if remaining.is_zero() {
break;
}
let poll_once = poll_fn(|cx| Pin::new(&mut stream).poll_next(cx));
match tokio::time::timeout(remaining, poll_once).await {
Ok(Some(Ok(ev))) => out.push(ev),
Ok(Some(Err(e))) => panic!("stream yielded error: {e:?}"),
Ok(None) => break,
Err(_) => break,
}
}
out
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn outbox_cursor_catch_up_then_live() {
let backend = fresh_backend().await;
let (_eid, h) = seed_and_claim(&backend).await;
backend
.append_frame(&h, Frame::new(b"pre-1".to_vec(), FrameKind::Stdout))
.await
.expect("append pre-1");
backend
.append_frame(&h, Frame::new(b"pre-2".to_vec(), FrameKind::Stdout))
.await
.expect("append pre-2");
let pool = backend.pool_for_test().clone();
let rx = backend.stream_frame_receiver_for_test();
let stream = spawn(OutboxCursorConfig {
pool,
select_sql: q_stream::OUTBOX_TAIL_STREAM_FRAME_SQL,
partition_key: 0,
cursor: 0,
batch_size: 64,
wakeup: rx,
decoder: test_decoder(),
row_event_id: test_row_event_id,
});
let backend2 = backend.clone();
let h2 = h.clone();
let producer = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(40)).await;
backend2
.append_frame(&h2, Frame::new(b"live".to_vec(), FrameKind::Stdout))
.await
.expect("append live");
});
let collected = drain_n(stream, 3, Duration::from_millis(2_000)).await;
producer.await.expect("producer");
assert_eq!(collected.len(), 3, "expected 3 events, got {collected:?}");
let payloads: Vec<&str> = collected.iter().map(|e| e.payload.as_str()).collect();
assert_eq!(payloads, vec!["pre-1", "pre-2", "live"]);
for pair in collected.windows(2) {
assert!(
pair[1].event_id > pair[0].event_id,
"event_id not monotonic: {:?}",
pair
);
}
assert!(collected[0].ts_ms > 0);
let _ = collected[0].seq; }
#[tokio::test]
#[serial(ff_dev_mode)]
async fn outbox_cursor_resume_skips_seen() {
let backend = fresh_backend().await;
let (_eid, h) = seed_and_claim(&backend).await;
backend
.append_frame(&h, Frame::new(b"one".to_vec(), FrameKind::Stdout))
.await
.expect("append 1");
backend
.append_frame(&h, Frame::new(b"two".to_vec(), FrameKind::Stdout))
.await
.expect("append 2");
let pool = backend.pool_for_test();
let first_id: i64 =
sqlx::query_scalar("SELECT MIN(_rowid_) FROM ff_stream_frame WHERE partition_key = 0")
.fetch_one(pool)
.await
.expect("min rowid");
let rx = backend.stream_frame_receiver_for_test();
let stream = spawn(OutboxCursorConfig {
pool: pool.clone(),
select_sql: q_stream::OUTBOX_TAIL_STREAM_FRAME_SQL,
partition_key: 0,
cursor: first_id,
batch_size: 64,
wakeup: rx,
decoder: test_decoder(),
row_event_id: test_row_event_id,
});
let collected = drain_n(stream, 1, Duration::from_millis(500)).await;
assert_eq!(collected.len(), 1);
assert_eq!(collected[0].payload, "two");
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn outbox_cursor_clean_shutdown_on_drop() {
let backend = fresh_backend().await;
let (_eid, _h) = seed_and_claim(&backend).await;
let pool = backend.pool_for_test().clone();
let rx = backend.stream_frame_receiver_for_test();
let stream = spawn(OutboxCursorConfig::<TestFrameEvent> {
pool,
select_sql: q_stream::OUTBOX_TAIL_STREAM_FRAME_SQL,
partition_key: 0,
cursor: 0,
batch_size: 64,
wakeup: rx,
decoder: test_decoder(),
row_event_id: test_row_event_id,
});
drop(stream);
tokio::time::sleep(Duration::from_millis(50)).await;
let (_eid2, h2) = seed_and_claim(&backend).await;
backend
.append_frame(&h2, Frame::new(b"after-drop".to_vec(), FrameKind::Stdout))
.await
.expect("append post-drop");
}
}