use ff_core::contracts::{ListLanesPage, ListSuspendedPage, SuspendedExecutionEntry};
use ff_core::engine_error::{EngineError, ValidationKind};
use ff_core::partition::PartitionKey;
use ff_core::types::{ExecutionId, LaneId};
use sqlx::{PgPool, Row};
use uuid::Uuid;
use crate::error::map_sqlx_error;
pub(crate) async fn list_lanes_impl(
pool: &PgPool,
cursor: Option<LaneId>,
limit: usize,
) -> Result<ListLanesPage, EngineError> {
if limit == 0 {
return Ok(ListLanesPage::new(Vec::new(), None));
}
let fetch_n: i64 = (limit as i64).saturating_add(1);
let cursor_str = cursor.as_ref().map(|l| l.as_str().to_owned());
let rows = sqlx::query(
"SELECT lane_id FROM ff_lane_registry \
WHERE ($1::text IS NULL OR lane_id > $1) \
ORDER BY lane_id ASC \
LIMIT $2",
)
.bind(cursor_str)
.bind(fetch_n)
.fetch_all(pool)
.await
.map_err(map_sqlx_error)?;
let has_more = rows.len() > limit;
let take = if has_more { limit } else { rows.len() };
let mut lanes: Vec<LaneId> = Vec::with_capacity(take);
for row in rows.iter().take(take) {
let raw: String = row.get("lane_id");
let lane = LaneId::try_new(raw.clone()).map_err(|e| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!(
"list_lanes: ff_lane_registry: lane_id '{raw}' is not a valid LaneId \
(registry corruption?): {e:?}"
),
})?;
lanes.push(lane);
}
let next_cursor = if has_more { lanes.last().cloned() } else { None };
Ok(ListLanesPage::new(lanes, next_cursor))
}
pub(crate) async fn list_suspended_impl(
pool: &PgPool,
partition: PartitionKey,
cursor: Option<ExecutionId>,
limit: usize,
) -> Result<ListSuspendedPage, EngineError> {
if limit == 0 {
return Ok(ListSuspendedPage::new(Vec::new(), None));
}
let parsed = partition.parse().map_err(|e| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("list_suspended: partition: {e}"),
})?;
let partition_idx: i16 = parsed.index as i16;
let (cursor_ms, cursor_uuid) = if let Some(ref c) = cursor {
let uuid = parse_execution_uuid(c, "list_suspended")?;
let row = sqlx::query(
"SELECT suspended_at_ms FROM ff_suspension_current \
WHERE partition_key = $1 AND execution_id = $2",
)
.bind(partition_idx)
.bind(uuid)
.fetch_optional(pool)
.await
.map_err(map_sqlx_error)?;
(row.map(|r| r.get::<i64, _>("suspended_at_ms")), Some(uuid))
} else {
(None, None)
};
let fetch_n: i64 = (limit as i64).saturating_add(1);
let rows = match (cursor_ms, cursor_uuid) {
(Some(last_ms), Some(last_uuid)) => {
sqlx::query(
"SELECT execution_id, suspended_at_ms, reason_code \
FROM ff_suspension_current \
WHERE partition_key = $1 \
AND (suspended_at_ms, execution_id) > ($2, $3) \
ORDER BY suspended_at_ms ASC, execution_id ASC \
LIMIT $4",
)
.bind(partition_idx)
.bind(last_ms)
.bind(last_uuid)
.bind(fetch_n)
.fetch_all(pool)
.await
}
(None, Some(last_uuid)) => {
sqlx::query(
"SELECT execution_id, suspended_at_ms, reason_code \
FROM ff_suspension_current \
WHERE partition_key = $1 AND execution_id > $2 \
ORDER BY suspended_at_ms ASC, execution_id ASC \
LIMIT $3",
)
.bind(partition_idx)
.bind(last_uuid)
.bind(fetch_n)
.fetch_all(pool)
.await
}
_ => {
sqlx::query(
"SELECT execution_id, suspended_at_ms, reason_code \
FROM ff_suspension_current \
WHERE partition_key = $1 \
ORDER BY suspended_at_ms ASC, execution_id ASC \
LIMIT $2",
)
.bind(partition_idx)
.bind(fetch_n)
.fetch_all(pool)
.await
}
}
.map_err(map_sqlx_error)?;
let has_more = rows.len() > limit;
let take = if has_more { limit } else { rows.len() };
let mut entries: Vec<SuspendedExecutionEntry> = Vec::with_capacity(take);
for row in rows.iter().take(take) {
let uuid: Uuid = row.get("execution_id");
let suspended_at_ms: i64 = row.get("suspended_at_ms");
let reason: Option<String> = row.get("reason_code");
let eid = build_execution_id(parsed.index, uuid)?;
entries.push(SuspendedExecutionEntry::new(
eid,
suspended_at_ms,
reason.unwrap_or_default(),
));
}
let next_cursor = if has_more {
entries.last().map(|e| e.execution_id.clone())
} else {
None
};
Ok(ListSuspendedPage::new(entries, next_cursor))
}
fn parse_execution_uuid(id: &ExecutionId, op: &'static str) -> Result<Uuid, EngineError> {
let s = id.as_str();
let rest = s.strip_prefix("{fp:").ok_or_else(|| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("{op}: cursor missing '{{fp:' prefix: {s}"),
})?;
let close = rest.find("}:").ok_or_else(|| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("{op}: cursor missing '}}:' delimiter: {s}"),
})?;
let uuid_str = &rest[close + 2..];
Uuid::parse_str(uuid_str).map_err(|e| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("{op}: cursor UUID invalid: {e}"),
})
}
fn build_execution_id(partition_idx: u16, uuid: Uuid) -> Result<ExecutionId, EngineError> {
let s = format!("{{fp:{partition_idx}}}:{uuid}");
ExecutionId::parse(&s).map_err(|e| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!(
"list_suspended: ff_suspension_current row produced invalid ExecutionId '{s}': {e:?}"
),
})
}